Repository: tajo
Updated Branches:
  refs/heads/master 4b1b7799d -> d261234ff


http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
new file mode 100644
index 0000000..6ab8574
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -0,0 +1,1227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FileTablespace extends Tablespace {
+  private final Log LOG = LogFactory.getLog(FileTablespace.class);
+
+  static final String OUTPUT_FILE_PREFIX="part-";
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(2);
+          return fmt;
+        }
+      };
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(6);
+          return fmt;
+        }
+      };
+
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(3);
+          return fmt;
+        }
+      };
+
+  protected FileSystem fs;
+  protected Path tableBaseDir;
+  protected boolean blocksMetadataEnabled;
+  private static final HdfsVolumeId zeroVolumeId = new 
HdfsVolumeId(Bytes.toBytes(0));
+
+  public FileTablespace(String storeType) {
+    super(storeType);
+  }
+
+  @Override
+  protected void storageInit() throws IOException {
+    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+    this.fs = tableBaseDir.getFileSystem(conf);
+    this.blocksMetadataEnabled = 
conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    if (!this.blocksMetadataEnabled)
+      LOG.warn("does not support block metadata. 
('dfs.datanode.hdfs-blocks-metadata.enabled')");
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    return getFileScanner(meta, schema, path, status);
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, 
FileStatus status)
+      throws IOException {
+    Fragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
+    return getScanner(meta, schema, fragment);
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public void delete(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    fs.delete(tablePath, true);
+  }
+
+  public boolean exists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    return fileSystem.exists(path);
+  }
+
+  public Path getTablePath(String tableName) {
+    return new Path(tableBaseDir, tableName);
+  }
+
+  private String partitionPath = "";
+  private int currentDepth = 0;
+
+  /**
+   * Set a specific partition path for partition-column only queries
+   * @param path The partition prefix path
+   */
+  public void setPartitionPath(String path) { partitionPath = path; }
+
+  /**
+   * Set a depth of partition path for partition-column only queries
+   * @param depth Depth of partitions
+   */
+  public void setCurrentDepth(int depth) { currentDepth = depth; }
+
+  @VisibleForTesting
+  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
+      throws IOException {
+    return getAppender(null, null, meta, schema, filePath);
+  }
+
+  public FileFragment[] split(String tableName) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, long fragmentSize) throws 
IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fragmentSize);
+  }
+
+  public FileFragment[] split(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, Path tablePath) throws 
IOException {
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  private FileFragment[] split(String tableName, Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public static FileFragment[] splitNG(Configuration conf, String tableName, 
TableMeta meta,
+                                       Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public long calculateSize(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    long totalSize = 0;
+
+    if (fs.exists(tablePath)) {
+      totalSize = fs.getContentSummary(tablePath).getLength();
+    }
+
+    return totalSize;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // FileInputFormat Area
+  /////////////////////////////////////////////////////////////////////////////
+  public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
+    if (taskAttemptId == null) {
+      // For testcase
+      return workDir;
+    }
+    // The final result of a task will be written in a file named 
part-ss-nnnnnnn,
+    // where ss is the stage id associated with this task, and nnnnnn is the 
task id.
+    Path outFilePath = StorageUtil.concatPath(workDir, 
TajoConstants.RESULT_DIR_NAME,
+        OUTPUT_FILE_PREFIX +
+            
OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId())
 + "-" +
+            
OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
+            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+    LOG.info("Output File Path: " + outFilePath);
+
+    return outFilePath;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression.
+   *
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (int i = 0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+
+      FileSystem fs = p.getFileSystem(conf);
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDirectory()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * <p/>
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never 
split-up
+   * so that Mappers process entire files.
+   *
+   *
+   * @param path the file name to check
+   * @param status get the file length
+   * @return is this file isSplittable?
+   */
+  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, 
FileStatus status) throws IOException {
+    Scanner scanner = getFileScanner(meta, schema, path, status);
+    boolean split = scanner.isSplittable();
+    scanner.close();
+    return split;
+  }
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  protected int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+          (offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length) {
+    return new FileFragment(fragmentId, file, start, length);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length,
+                                   String[] hosts) {
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation 
blockLocation)
+      throws IOException {
+    return new FileFragment(fragmentId, file, blockLocation);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected FileFragment makeNonSplit(String fragmentId, Path file, long 
start, long length,
+                                      BlockLocation[] blkLocations) throws 
IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, 
Integer>>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+    }
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  /**
+   * Get the minimum split size
+   *
+   * @return the minimum number of bytes that can be in a split
+   */
+  public long getMinSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
+  }
+
+  /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
+        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the map of host and make them into Volume Ids.
+   *
+   */
+  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
+    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+    for (FileFragment frag : frags) {
+      String[] hosts = frag.getHosts();
+      int[] diskIds = frag.getDiskIds();
+      for (int i = 0; i < hosts.length; i++) {
+        Set<Integer> volumeList = volumeMap.get(hosts[i]);
+        if (volumeList == null) {
+          volumeList = new HashSet<Integer>();
+          volumeMap.put(hosts[i], volumeList);
+        }
+
+        if (diskIds.length > 0 && diskIds[i] > -1) {
+          volumeList.add(diskIds[i]);
+        }
+      }
+    }
+
+    return volumeMap;
+  }
+  /**
+   * Generate the list of files and make them into FileSplits.
+   *
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema 
schema, Path... inputs)
+      throws IOException {
+    // generate splits'
+
+    List<Fragment> splits = Lists.newArrayList();
+    List<Fragment> volumeSplits = Lists.newArrayList();
+    List<BlockLocation> blockLocations = Lists.newArrayList();
+
+    for (Path p : inputs) {
+      FileSystem fs = p.getFileSystem(conf);
+
+      ArrayList<FileStatus> files = Lists.newArrayList();
+      if (fs.isFile(p)) {
+        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+      } else {
+        files.addAll(listStatus(p));
+      }
+
+      int previousSplitSize = splits.size();
+      for (FileStatus file : files) {
+        Path path = file.getPath();
+        long length = file.getLen();
+        if (length > 0) {
+          // Get locations of blocks of file
+          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
+          boolean splittable = isSplittable(meta, schema, path, file);
+          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+            if (splittable) {
+              for (BlockLocation blockLocation : blkLocations) {
+                volumeSplits.add(makeSplit(tableName, path, blockLocation));
+              }
+              blockLocations.addAll(Arrays.asList(blkLocations));
+
+            } else { // Non splittable
+              long blockSize = blkLocations[0].getLength();
+              if (blockSize >= length) {
+                blockLocations.addAll(Arrays.asList(blkLocations));
+                for (BlockLocation blockLocation : blkLocations) {
+                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
+                }
+              } else {
+                splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
+              }
+            }
+
+          } else {
+            if (splittable) {
+
+              long minSize = Math.max(getMinSplitSize(), 1);
+
+              long blockSize = file.getBlockSize(); // s3n rest api contained 
block size but blockLocations is one
+              long splitSize = Math.max(minSize, blockSize);
+              long bytesRemaining = length;
+
+              // for s3
+              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, 
splitSize,
+                    blkLocations[blkIndex].getHosts()));
+                bytesRemaining -= splitSize;
+              }
+              if (bytesRemaining > 0) {
+                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, 
bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else { // Non splittable
+              splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
+            }
+          }
+        } else {
+          //for zero length files
+          splits.add(makeSplit(tableName, path, 0, length));
+        }
+      }
+      if(LOG.isDebugEnabled()){
+        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
+      }
+    }
+
+    // Combine original fileFragments with new VolumeId information
+    setVolumeMeta(volumeSplits, blockLocations);
+    splits.addAll(volumeSplits);
+    LOG.info("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> 
blockLocations)
+      throws IOException {
+
+    int locationSize = blockLocations.size();
+    int splitSize = splits.size();
+    if (locationSize == 0 || splitSize == 0) return;
+
+    if (locationSize != splitSize) {
+      // splits and locations don't match up
+      LOG.warn("Number of block locations not equal to number of splits: "
+          + "#locations=" + locationSize
+          + " #splits=" + splitSize);
+      return;
+    }
+
+    DistributedFileSystem fs = 
(DistributedFileSystem)DistributedFileSystem.get(conf);
+    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, 
DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    int blockLocationIdx = 0;
+
+    Iterator<Fragment> iter = splits.iterator();
+    while (locationSize > blockLocationIdx) {
+
+      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, 
blockLocationIdx + subSize);
+      //BlockStorageLocation containing additional volume location information 
for each replica of each block.
+      BlockStorageLocation[] blockStorageLocations = 
fs.getFileBlockStorageLocations(locations);
+
+      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+        
((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+        blockLocationIdx++;
+      }
+    }
+    LOG.info("# of splits with volumeId " + splitSize);
+  }
+
+  private static class InvalidInputException extends IOException {
+    List<IOException> errors;
+    public InvalidInputException(List<IOException> errors) {
+      this.errors = errors;
+    }
+
+    @Override
+    public String getMessage(){
+      StringBuffer sb = new StringBuffer();
+      int messageLimit = Math.min(errors.size(), 10);
+      for (int i = 0; i < messageLimit ; i ++) {
+        sb.append(errors.get(i).getMessage()).append("\n");
+      }
+
+      if(messageLimit < errors.size())
+        sb.append("skipped .....").append("\n");
+
+      return sb.toString();
+    }
+  }
+
+  @Override
+  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode 
scanNode) throws IOException {
+    return getSplits(tableName, table.getMeta(), table.getSchema(), new 
Path(table.getPath()));
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws 
IOException {
+    if (!tableDesc.isExternal()) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+      String databaseName = splitted[0];
+      String simpleTableName = splitted[1];
+
+      // create a table directory (i.e., 
${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, 
simpleTableName);
+      tableDesc.setPath(tablePath.toUri());
+    } else {
+      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION 
must be given.");
+    }
+
+    Path path = new Path(tableDesc.getPath());
+
+    FileSystem fs = path.getFileSystem(conf);
+    TableStats stats = new TableStats();
+    if (tableDesc.isExternal()) {
+      if (!fs.exists(path)) {
+        LOG.error(path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
+    }
+
+    long totalSize = 0;
+
+    try {
+      totalSize = calculateSize(path);
+    } catch (IOException e) {
+      LOG.warn("Cannot calculate the size of the relation", e);
+    }
+
+    stats.setNumBytes(totalSize);
+
+    if (tableDesc.isExternal()) { // if it is an external table, there is no 
way to know the exact row number without processing.
+      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    }
+
+    tableDesc.setStats(stats);
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException {
+    try {
+      Path path = new Path(tableDesc.getPath());
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Delete table data dir: " + path);
+      fs.delete(path, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+  }
+
+  @Override
+  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numResultFragments) throws IOException {
+    // Listing table data file which is not empty.
+    // If the table is a partitioned table, return file list which has same 
partition key.
+    Path tablePath = new Path(tableDesc.getPath());
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    //In the case of partitioned table, we should return same partition key 
data files.
+    int partitionDepth = 0;
+    if (tableDesc.hasPartition()) {
+      partitionDepth = 
tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size();
+    }
+
+    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
+    if (fs.exists(tablePath)) {
+      if (!partitionPath.isEmpty()) {
+        Path partPath = new Path(tableDesc.getPath() + partitionPath);
+        if (fs.exists(partPath)) {
+          getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, 
currentPage, numResultFragments,
+                  new AtomicInteger(0), tableDesc.hasPartition(), 
this.currentDepth, partitionDepth);
+        }
+      } else {
+        getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, 
currentPage, numResultFragments,
+                new AtomicInteger(0), tableDesc.hasPartition(), 0, 
partitionDepth);
+      }
+    }
+
+    List<Fragment> fragments = new ArrayList<Fragment>();
+
+    String[] previousPartitionPathNames = null;
+    for (FileStatus eachFile: nonZeroLengthFiles) {
+      FileFragment fileFragment = new FileFragment(tableDesc.getName(), 
eachFile.getPath(), 0, eachFile.getLen(), null);
+
+      if (partitionDepth > 0) {
+        // finding partition key;
+        Path filePath = fileFragment.getPath();
+        Path parentPath = filePath;
+        String[] parentPathNames = new String[partitionDepth];
+        for (int i = 0; i < partitionDepth; i++) {
+          parentPath = parentPath.getParent();
+          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
+        }
+
+        // If current partitionKey == previousPartitionKey, add to result.
+        if (previousPartitionPathNames == null) {
+          fragments.add(fileFragment);
+        } else if (previousPartitionPathNames != null && 
Arrays.equals(previousPartitionPathNames, parentPathNames)) {
+          fragments.add(fileFragment);
+        } else {
+          break;
+        }
+        previousPartitionPathNames = parentPathNames;
+      } else {
+        fragments.add(fileFragment);
+      }
+    }
+
+    return fragments;
+  }
+
+  /**
+   *
+   * @param fs
+   * @param path The table path
+   * @param result The final result files to be used
+   * @param startFileIndex
+   * @param numResultFiles
+   * @param currentFileIndex
+   * @param partitioned A flag to indicate if this table is partitioned
+   * @param currentDepth Current visiting depth of partition directories
+   * @param maxDepth The partition depth of this table
+   * @throws IOException
+   */
+  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, 
List<FileStatus> result,
+                                                int startFileIndex, int 
numResultFiles,
+                                                AtomicInteger 
currentFileIndex, boolean partitioned,
+                                                int currentDepth, int 
maxDepth) throws IOException {
+    // Intermediate directory
+    if (fs.isDirectory(path)) {
+
+      FileStatus[] files = fs.listStatus(path, Tablespace.hiddenFileFilter);
+
+      if (files != null && files.length > 0) {
+
+        for (FileStatus eachFile : files) {
+
+          // checking if the enough number of files are found
+          if (result.size() >= numResultFiles) {
+            return;
+          }
+          if (eachFile.isDirectory()) {
+
+            getNonZeroLengthDataFiles(
+                fs,
+                eachFile.getPath(),
+                result,
+                startFileIndex,
+                numResultFiles,
+                currentFileIndex,
+                partitioned,
+                currentDepth + 1, // increment a visiting depth
+                maxDepth);
+
+            // if partitioned table, we should ignore files located in the 
intermediate directory.
+            // we can ensure that this file is in leaf directory if 
currentDepth == maxDepth.
+          } else if (eachFile.isFile() && eachFile.getLen() > 0 && 
(!partitioned || currentDepth == maxDepth)) {
+            if (currentFileIndex.get() >= startFileIndex) {
+              result.add(eachFile);
+            }
+            currentFileIndex.incrementAndGet();
+          }
+        }
+      }
+
+      // Files located in leaf directory
+    } else {
+      FileStatus fileStatus = fs.getFileStatus(path);
+      if (fileStatus != null && fileStatus.getLen() > 0) {
+        if (currentFileIndex.get() >= startFileIndex) {
+          result.add(fileStatus);
+        }
+        currentFileIndex.incrementAndGet();
+        if (result.size() >= numResultFiles) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public StorageProperty getStorageProperty() {
+    StorageProperty storageProperty = new StorageProperty();
+    storageProperty.setSortedInsert(false);
+    if (storeType.equalsIgnoreCase("RAW")) {
+      storageProperty.setSupportsInsertInto(false);
+    } else {
+      storageProperty.setSupportsInsertInto(true);
+    }
+
+    return storageProperty;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public void rollbackOutputCommit(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) 
throws IOException {
+  }
+
+  @Override
+  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf 
queryContext, TableDesc tableDesc)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId 
finalEbId, LogicalPlan plan,
+                               Schema schema, TableDesc tableDesc) throws 
IOException {
+    return commitOutputData(queryContext, true);
+  }
+
+  @Override
+  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, 
TableDesc tableDesc,
+                                          Schema inputSchema, SortSpec[] 
sortSpecs, TupleRange dataRange)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the 
staging directory.
+   *
+   * @param queryContext The query property
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, boolean 
changeFileSeq) throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+      try {
+        FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // 
INSERT OVERWRITE INTO
+
+          // It moves the original table into the temporary location.
+          // Then it moves the new result table into the original table 
location.
+          // Upon failed, it recovers the original table if possible.
+          boolean movedToOldTable = false;
+          boolean committed = false;
+          Path oldTableDir = new Path(stagingDir, 
TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+          ContentSummary summary = fs.getContentSummary(stagingResultDir);
+
+          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && 
summary.getFileCount() > 0L) {
+            // This is a map for existing non-leaf directory to rename. A key 
is current directory and a value is
+            // renaming directory.
+            Map<Path, Path> renameDirs = TUtil.newHashMap();
+            // This is a map for recovering existing partition directory. A 
key is current directory and a value is
+            // temporary directory to back up.
+            Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+            try {
+              if (!fs.exists(finalOutputDir)) {
+                fs.mkdirs(finalOutputDir);
+              }
+
+              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, 
stagingResultDir.toString(),
+                  renameDirs, oldTableDir);
+
+              // Rename target partition directories
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                // Backup existing data files for recovering
+                if (fs.exists(entry.getValue())) {
+                  String recoveryPathString = 
entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                      oldTableDir.toString());
+                  Path recoveryPath = new Path(recoveryPathString);
+                  fs.rename(entry.getValue(), recoveryPath);
+                  fs.exists(recoveryPath);
+                  recoveryDirs.put(entry.getValue(), recoveryPath);
+                }
+                // Delete existing directory
+                fs.delete(entry.getValue(), true);
+                // Rename staging directory to final output directory
+                fs.rename(entry.getKey(), entry.getValue());
+              }
+
+            } catch (IOException ioe) {
+              // Remove created dirs
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+              }
+
+              // Recovery renamed dirs
+              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+                fs.rename(entry.getValue(), entry.getKey());
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          } else { // no partition
+            try {
+
+              // if the final output dir exists, move all contents to the 
temporary table dir.
+              // Otherwise, just make the final output dir. As a result, the 
final output dir will be empty.
+              if (fs.exists(finalOutputDir)) {
+                fs.mkdirs(oldTableDir);
+
+                for (FileStatus status : fs.listStatus(finalOutputDir, 
Tablespace.hiddenFileFilter)) {
+                  fs.rename(status.getPath(), oldTableDir);
+                }
+
+                movedToOldTable = fs.exists(oldTableDir);
+              } else { // if the parent does not exist, make its parent 
directory.
+                fs.mkdirs(finalOutputDir);
+              }
+
+              // Move the results to the final output dir.
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+
+              // Check the final output dir
+              committed = fs.exists(finalOutputDir);
+
+            } catch (IOException ioe) {
+              // recover the old table
+              if (movedToOldTable && !committed) {
+
+                // if commit is failed, recover the old data
+                for (FileStatus status : fs.listStatus(finalOutputDir, 
Tablespace.hiddenFileFilter)) {
+                  fs.delete(status.getPath(), true);
+                }
+
+                for (FileStatus status : fs.listStatus(oldTableDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          }
+        } else {
+          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { 
// INSERT INTO an existing table
+
+            NumberFormat fmt = NumberFormat.getInstance();
+            fmt.setGroupingUsed(false);
+            fmt.setMinimumIntegerDigits(3);
+
+            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.isFile()) {
+                  LOG.warn("Partition table can't have file in a staging dir: 
" + eachFile.getPath());
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, -1, changeFileSeq);
+              }
+            } else {
+              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, 
false) + 1;
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.getPath().getName().startsWith("_")) {
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, maxSeq++, changeFileSeq);
+              }
+            }
+            // checking all file moved and remove empty dir
+            verifyAllFileMoved(fs, stagingResultDir);
+            FileStatus[] files = fs.listStatus(stagingResultDir);
+            if (files != null && files.length != 0) {
+              for (FileStatus eachFile: files) {
+                LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+              }
+            }
+          } else { // CREATE TABLE AS SELECT (CTAS)
+            if (fs.exists(finalOutputDir)) {
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+            } else {
+              fs.rename(stagingResultDir, finalOutputDir);
+            }
+            LOG.info("Moved from the staging dir to the output directory '" + 
finalOutputDir);
+          }
+        }
+
+        // remove the staging directory if the final output dir is given.
+        Path stagingDirRoot = stagingDir.getParent();
+        fs.delete(stagingDirRoot, true);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    }
+
+    return finalOutputDir;
+  }
+
+  /**
+   * Attach the sequence number to the output file name and than move the file 
into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws java.io.IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path 
finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) 
throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + 
fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), 
replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " 
+
+              "to final output[" + finalSubPath + "]");
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
+  }
+
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws java.io.IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) 
throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + 
nf.format(seq);
+  }
+
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws java.io.IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws 
IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method sets a rename map which includes renamed staging directory to 
final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws java.io.IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path 
outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path 
oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, 
stagingParentPathString,
+            renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException 
{
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 1846ed6..66c7f13 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -82,7 +82,7 @@ public class HashShuffleAppenderManager {
         if (!fs.exists(dataFile.getParent())) {
           fs.mkdirs(dataFile.getParent());
         }
-        FileAppender appender = (FileAppender)((FileStorageManager) 
TableSpaceManager.getFileStorageManager(tajoConf))
+        FileAppender appender = (FileAppender)((FileTablespace) 
TableSpaceManager.getFileStorageManager(tajoConf))
             .getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 779f908..75ad0d5 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.enableStats();
 
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 41c6c67..76b5c2f 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -80,7 +80,7 @@ public class TestFileStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", 
"table.csv");
     fs.mkdirs(path.getParent());
-    FileStorageManager fileStorageManager = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace fileStorageManager = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
 
                Appender appender = fileStorageManager.getAppender(meta, 
schema, path);
@@ -127,7 +127,7 @@ public class TestFileStorageManager {
       }
 
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(tajoConf);
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(tajoConf);
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
@@ -181,7 +181,7 @@ public class TestFileStorageManager {
         DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
       }
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(tajoConf);
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(tajoConf);
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
@@ -220,11 +220,11 @@ public class TestFileStorageManager {
 
     try {
       /* Local FileSystem */
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getStorageManager(conf, "CSV");
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getStorageManager(conf, "CSV");
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       /* Distributed FileSystem */
-      sm = (FileStorageManager) TableSpaceManager.getStorageManager(tajoConf, 
"CSV");
+      sm = (FileTablespace) TableSpaceManager.getStorageManager(tajoConf, 
"CSV");
       assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
       assertEquals(cluster.getFileSystem().getUri(), 
sm.getFileSystem().getUri());
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 1222fae..235bfaa 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -50,14 +50,14 @@ public class TestFileSystems {
 
   private static String TEST_PATH = "target/test-data/TestFileSystem";
   private TajoConf conf;
-  private FileStorageManager sm;
+  private FileTablespace sm;
   private FileSystem fs;
   private Path testDir;
 
   public TestFileSystems(FileSystem fs) throws IOException {
     this.fs = fs;
     this.conf = new TajoConf(fs.getConf());
-    sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 82acaf3..04f8a90 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -49,7 +49,7 @@ import static org.junit.Assert.*;
 @RunWith(Parameterized.class)
 public class TestMergeScanner {
   private TajoConf conf;
-  StorageManager sm;
+  Tablespace sm;
   private static String TEST_PATH = "target/test-data/TestMergeScanner";
 
   private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 286902a..4ca61e4 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -155,7 +155,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -210,7 +210,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -271,7 +271,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testProjection.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
     int tupleNum = 10000;
@@ -347,7 +347,7 @@ public class TestStorages {
       meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
     }
 
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
@@ -425,7 +425,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -513,7 +513,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.CSVFILE_SERDE, 
TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -583,7 +583,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_SERDE, 
BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -653,7 +653,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, 
TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -727,7 +727,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, 
BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -789,7 +789,7 @@ public class TestStorages {
       TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 
       Path tablePath = new Path(testDir, "testTime.data");
-      FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+      FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.init();
 
@@ -831,7 +831,7 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     Path tablePath = new Path(testDir, "Seekable.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     FileAppender appender = (FileAppender) sm.getAppender(meta, schema, 
tablePath);
     appender.enableStats();
     appender.init();
@@ -921,7 +921,7 @@ public class TestStorages {
       conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
     }
 
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Path tablePath = new Path(testDir, "testMaxValue.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
 
@@ -977,7 +977,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 
@@ -1041,7 +1041,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "test_storetype_oversize.data");
-    FileStorageManager sm = (FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) 
TableSpaceManager.getFileStorageManager(conf);
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index ae0fd58..e15c474 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValue_" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -177,7 +177,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + 
storeType);
-    FileAppender appender = (FileAppender) ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf))
+    FileAppender appender = (FileAppender) ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf))
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -256,7 +256,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + 
storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -326,7 +326,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -416,7 +416,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + 
storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf))
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf))
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -496,7 +496,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
 
     Tuple tuple;
@@ -579,7 +579,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testMinMax_" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 5; i < TUPLE_NUM; i += 2) {
@@ -683,7 +683,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
 
     Tuple tuple;
@@ -763,7 +763,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
 
     Tuple tuple;
@@ -854,7 +854,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + 
storeType);
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
 
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index cebeeb2..09191f4 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, 
"testFindValueInSingleCSV", "table.csv");
     fs.mkdirs(tablePath.getParent());
 
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, 
"testFindNextKeyValueInSingleCSV",
         "table1.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = ((FileStorageManager) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
+    Appender appender = ((FileTablespace) 
TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index da49d1d..6a9e7ce 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -28,11 +28,11 @@
   <!-- Storage Manager Configuration -->
   <property>
     <name>tajo.storage.manager.hdfs.class</name>
-    <value>org.apache.tajo.storage.FileStorageManager</value>
+    <value>org.apache.tajo.storage.FileTablespace</value>
   </property>
   <property>
     <name>tajo.storage.manager.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+    <value>org.apache.tajo.storage.hbase.HBaseTablespace</value>
   </property>
 
   <!--- Registered Scanner Handler -->

Reply via email to