http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index df73448,0000000..060bf16
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@@ -1,854 -1,0 +1,882 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Lists;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.*;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.tajo.OverridableConf;
 +import org.apache.tajo.QueryUnitAttemptId;
 +import org.apache.tajo.TajoConstants;
 +import org.apache.tajo.catalog.*;
 +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 +import org.apache.tajo.catalog.statistics.TableStats;
 +import org.apache.tajo.conf.TajoConf;
 +import org.apache.tajo.plan.logical.LogicalNode;
 +import org.apache.tajo.plan.logical.ScanNode;
 +import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
 +import org.apache.tajo.util.Bytes;
 +
 +import java.io.IOException;
 +import java.text.NumberFormat;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +public class FileStorageManager extends StorageManager {
 +  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
 +
 +  static final String OUTPUT_FILE_PREFIX="part-";
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(2);
 +          return fmt;
 +        }
 +      };
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(6);
 +          return fmt;
 +        }
 +      };
 +
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(3);
 +          return fmt;
 +        }
 +      };
 +
 +  protected FileSystem fs;
 +  protected Path tableBaseDir;
 +  protected boolean blocksMetadataEnabled;
 +  private static final HdfsVolumeId zeroVolumeId = new 
HdfsVolumeId(Bytes.toBytes(0));
 +
 +  public FileStorageManager(StoreType storeType) {
 +    super(storeType);
 +  }
 +
 +  @Override
 +  protected void storageInit() throws IOException {
 +    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
 +    this.fs = tableBaseDir.getFileSystem(conf);
 +    this.blocksMetadataEnabled = 
conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
 +        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
 +    if (!this.blocksMetadataEnabled)
 +      LOG.warn("does not support block metadata. 
('dfs.datanode.hdfs-blocks-metadata.enabled')");
 +  }
 +
 +  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
 +      throws IOException {
 +    FileSystem fs = path.getFileSystem(conf);
 +    FileStatus status = fs.getFileStatus(path);
 +    return getFileScanner(meta, schema, path, status);
 +  }
 +
 +  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, 
FileStatus status)
 +      throws IOException {
 +    Fragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
 +    return getScanner(meta, schema, fragment);
 +  }
 +
 +  public FileSystem getFileSystem() {
 +    return this.fs;
 +  }
 +
 +  public Path getWarehouseDir() {
 +    return this.tableBaseDir;
 +  }
 +
 +  public void delete(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    fs.delete(tablePath, true);
 +  }
 +
 +  public boolean exists(Path path) throws IOException {
 +    FileSystem fileSystem = path.getFileSystem(conf);
 +    return fileSystem.exists(path);
 +  }
 +
 +  /**
 +   * This method deletes only data contained in the given path.
 +   *
 +   * @param path The path in which data are deleted.
 +   * @throws IOException
 +   */
 +  public void deleteData(Path path) throws IOException {
 +    FileSystem fileSystem = path.getFileSystem(conf);
 +    FileStatus[] fileLists = fileSystem.listStatus(path);
 +    for (FileStatus status : fileLists) {
 +      fileSystem.delete(status.getPath(), true);
 +    }
 +  }
 +
 +  public Path getTablePath(String tableName) {
 +    return new Path(tableBaseDir, tableName);
 +  }
 +
 +  @VisibleForTesting
 +  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
 +      throws IOException {
 +    return getAppender(null, null, meta, schema, filePath);
 +  }
 +
 +  public FileFragment[] split(String tableName) throws IOException {
 +    Path tablePath = new Path(tableBaseDir, tableName);
 +    return split(tableName, tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  public FileFragment[] split(String tableName, long fragmentSize) throws 
IOException {
 +    Path tablePath = new Path(tableBaseDir, tableName);
 +    return split(tableName, tablePath, fragmentSize);
 +  }
 +
 +  public FileFragment[] splitBroadcastTable(Path tablePath) throws 
IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, 
file.getLen());
 +      listTablets.add(tablet);
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public FileFragment[] split(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  public FileFragment[] split(String tableName, Path tablePath) throws 
IOException {
 +    return split(tableName, tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  private FileFragment[] split(String tableName, Path tablePath, long size)
 +      throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +
 +    long defaultBlockSize = size;
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      long remainFileSize = file.getLen();
 +      long start = 0;
 +      if (remainFileSize > defaultBlockSize) {
 +        while (remainFileSize > defaultBlockSize) {
 +          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
 +          listTablets.add(tablet);
 +          start += defaultBlockSize;
 +          remainFileSize -= defaultBlockSize;
 +        }
 +        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
 +      } else {
 +        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
 +      }
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public static FileFragment[] splitNG(Configuration conf, String tableName, 
TableMeta meta,
 +                                       Path tablePath, long size)
 +      throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +
 +    long defaultBlockSize = size;
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      long remainFileSize = file.getLen();
 +      long start = 0;
 +      if (remainFileSize > defaultBlockSize) {
 +        while (remainFileSize > defaultBlockSize) {
 +          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
 +          listTablets.add(tablet);
 +          start += defaultBlockSize;
 +          remainFileSize -= defaultBlockSize;
 +        }
 +        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
 +      } else {
 +        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
 +      }
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public long calculateSize(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    long totalSize = 0;
 +
 +    if (fs.exists(tablePath)) {
 +      totalSize = fs.getContentSummary(tablePath).getLength();
 +    }
 +
 +    return totalSize;
 +  }
 +
 +  
/////////////////////////////////////////////////////////////////////////////
 +  // FileInputFormat Area
 +  
/////////////////////////////////////////////////////////////////////////////
- 
-   public static final PathFilter hiddenFileFilter = new PathFilter() {
-     public boolean accept(Path p) {
-       String name = p.getName();
-       return !name.startsWith("_") && !name.startsWith(".");
-     }
-   };
- 
 +  public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path 
workDir) {
 +    if (taskAttemptId == null) {
 +      // For testcase
 +      return workDir;
 +    }
 +    // The final result of a task will be written in a file named 
part-ss-nnnnnnn,
 +    // where ss is the subquery id associated with this task, and nnnnnn is 
the task id.
 +    Path outFilePath = StorageUtil.concatPath(workDir, 
TajoConstants.RESULT_DIR_NAME,
 +        OUTPUT_FILE_PREFIX +
 +            
OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId())
 + "-" +
 +            
OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + 
"-" +
 +            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
 +    LOG.info("Output File Path: " + outFilePath);
 +
 +    return outFilePath;
 +  }
 +
 +  /**
 +   * Proxy PathFilter that accepts a path only if all filters given in the
 +   * constructor do. Used by the listPaths() to apply the built-in
 +   * hiddenFileFilter together with a user provided one (if any).
 +   */
 +  private static class MultiPathFilter implements PathFilter {
 +    private List<PathFilter> filters;
 +
 +    public MultiPathFilter(List<PathFilter> filters) {
 +      this.filters = filters;
 +    }
 +
 +    public boolean accept(Path path) {
 +      for (PathFilter filter : filters) {
 +        if (!filter.accept(path)) {
 +          return false;
 +        }
 +      }
 +      return true;
 +    }
 +  }
 +
 +  /**
 +   * List input directories.
 +   * Subclasses may override to, e.g., select only files matching a regular
 +   * expression.
 +   *
 +   * @return array of FileStatus objects
 +   * @throws IOException if zero items.
 +   */
 +  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
 +    List<FileStatus> result = new ArrayList<FileStatus>();
 +    if (dirs.length == 0) {
 +      throw new IOException("No input paths specified in job");
 +    }
 +
 +    List<IOException> errors = new ArrayList<IOException>();
 +
 +    // creates a MultiPathFilter with the hiddenFileFilter and the
 +    // user provided one (if any).
 +    List<PathFilter> filters = new ArrayList<PathFilter>();
 +    filters.add(hiddenFileFilter);
 +
 +    PathFilter inputFilter = new MultiPathFilter(filters);
 +
 +    for (int i = 0; i < dirs.length; ++i) {
 +      Path p = dirs[i];
 +
 +      FileSystem fs = p.getFileSystem(conf);
 +      FileStatus[] matches = fs.globStatus(p, inputFilter);
 +      if (matches == null) {
 +        errors.add(new IOException("Input path does not exist: " + p));
 +      } else if (matches.length == 0) {
 +        errors.add(new IOException("Input Pattern " + p + " matches 0 
files"));
 +      } else {
 +        for (FileStatus globStat : matches) {
 +          if (globStat.isDirectory()) {
 +            for (FileStatus stat : fs.listStatus(globStat.getPath(),
 +                inputFilter)) {
 +              result.add(stat);
 +            }
 +          } else {
 +            result.add(globStat);
 +          }
 +        }
 +      }
 +    }
 +
 +    if (!errors.isEmpty()) {
 +      throw new InvalidInputException(errors);
 +    }
 +    LOG.info("Total input paths to process : " + result.size());
 +    return result;
 +  }
 +
 +  /**
 +   * Is the given filename splitable? Usually, true, but if the file is
 +   * stream compressed, it will not be.
 +   * <p/>
 +   * <code>FileInputFormat</code> implementations can override this and return
 +   * <code>false</code> to ensure that individual input files are never 
split-up
 +   * so that Mappers process entire files.
 +   *
 +   *
 +   * @param path the file name to check
 +   * @param status get the file length
 +   * @return is this file isSplittable?
 +   */
 +  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, 
FileStatus status) throws IOException {
 +    Scanner scanner = getFileScanner(meta, schema, path, status);
 +    boolean split = scanner.isSplittable();
 +    scanner.close();
 +    return split;
 +  }
 +
 +  private static final double SPLIT_SLOP = 1.1;   // 10% slop
 +
 +  protected int getBlockIndex(BlockLocation[] blkLocations,
 +                              long offset) {
 +    for (int i = 0; i < blkLocations.length; i++) {
 +      // is the offset inside this block?
 +      if ((blkLocations[i].getOffset() <= offset) &&
 +          (offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength())) {
 +        return i;
 +      }
 +    }
 +    BlockLocation last = blkLocations[blkLocations.length - 1];
 +    long fileLength = last.getOffset() + last.getLength() - 1;
 +    throw new IllegalArgumentException("Offset " + offset +
 +        " is outside of file (0.." +
 +        fileLength + ")");
 +  }
 +
 +  /**
 +   * A factory that makes the split for this class. It can be overridden
 +   * by sub-classes to make sub-types
 +   */
 +  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length) {
 +    return new FileFragment(fragmentId, file, start, length);
 +  }
 +
 +  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length,
 +                                   String[] hosts) {
 +    return new FileFragment(fragmentId, file, start, length, hosts);
 +  }
 +
 +  protected FileFragment makeSplit(String fragmentId, Path file, 
BlockLocation blockLocation)
 +      throws IOException {
 +    return new FileFragment(fragmentId, file, blockLocation);
 +  }
 +
 +  // for Non Splittable. eg, compressed gzip TextFile
 +  protected FileFragment makeNonSplit(String fragmentId, Path file, long 
start, long length,
 +                                      BlockLocation[] blkLocations) throws 
IOException {
 +
 +    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
 +    for (BlockLocation blockLocation : blkLocations) {
 +      for (String host : blockLocation.getHosts()) {
 +        if (hostsBlockMap.containsKey(host)) {
 +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
 +        } else {
 +          hostsBlockMap.put(host, 1);
 +        }
 +      }
 +    }
 +
 +    List<Map.Entry<String, Integer>> entries = new 
ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
 +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
 +
 +      @Override
 +      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
 +        return v1.getValue().compareTo(v2.getValue());
 +      }
 +    });
 +
 +    String[] hosts = new String[blkLocations[0].getHosts().length];
 +
 +    for (int i = 0; i < hosts.length; i++) {
 +      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - 
i);
 +      hosts[i] = entry.getKey();
 +    }
 +    return new FileFragment(fragmentId, file, start, length, hosts);
 +  }
 +
 +  /**
 +   * Get the minimum split size
 +   *
 +   * @return the minimum number of bytes that can be in a split
 +   */
 +  public long getMinSplitSize() {
 +    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
 +  }
 +
 +  /**
 +   * Get Disk Ids by Volume Bytes
 +   */
 +  private int[] getDiskIds(VolumeId[] volumeIds) {
 +    int[] diskIds = new int[volumeIds.length];
 +    for (int i = 0; i < volumeIds.length; i++) {
 +      int diskId = -1;
 +      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
 +        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
 +      }
 +      diskIds[i] = diskId;
 +    }
 +    return diskIds;
 +  }
 +
 +  /**
 +   * Generate the map of host and make them into Volume Ids.
 +   *
 +   */
 +  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
 +    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
 +    for (FileFragment frag : frags) {
 +      String[] hosts = frag.getHosts();
 +      int[] diskIds = frag.getDiskIds();
 +      for (int i = 0; i < hosts.length; i++) {
 +        Set<Integer> volumeList = volumeMap.get(hosts[i]);
 +        if (volumeList == null) {
 +          volumeList = new HashSet<Integer>();
 +          volumeMap.put(hosts[i], volumeList);
 +        }
 +
 +        if (diskIds.length > 0 && diskIds[i] > -1) {
 +          volumeList.add(diskIds[i]);
 +        }
 +      }
 +    }
 +
 +    return volumeMap;
 +  }
 +  /**
 +   * Generate the list of files and make them into FileSplits.
 +   *
 +   * @throws IOException
 +   */
 +  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema 
schema, Path... inputs)
 +      throws IOException {
 +    // generate splits'
 +
 +    List<Fragment> splits = Lists.newArrayList();
 +    List<Fragment> volumeSplits = Lists.newArrayList();
 +    List<BlockLocation> blockLocations = Lists.newArrayList();
 +
 +    for (Path p : inputs) {
 +      FileSystem fs = p.getFileSystem(conf);
++
 +      ArrayList<FileStatus> files = Lists.newArrayList();
 +      if (fs.isFile(p)) {
 +        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
 +      } else {
 +        files.addAll(listStatus(p));
 +      }
 +
 +      int previousSplitSize = splits.size();
 +      for (FileStatus file : files) {
 +        Path path = file.getPath();
 +        long length = file.getLen();
 +        if (length > 0) {
 +          // Get locations of blocks of file
 +          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
 +          boolean splittable = isSplittable(meta, schema, path, file);
 +          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
 +
 +            if (splittable) {
 +              for (BlockLocation blockLocation : blkLocations) {
 +                volumeSplits.add(makeSplit(tableName, path, blockLocation));
 +              }
 +              blockLocations.addAll(Arrays.asList(blkLocations));
 +
 +            } else { // Non splittable
 +              long blockSize = blkLocations[0].getLength();
 +              if (blockSize >= length) {
 +                blockLocations.addAll(Arrays.asList(blkLocations));
 +                for (BlockLocation blockLocation : blkLocations) {
 +                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
 +                }
 +              } else {
 +                splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
 +              }
 +            }
 +
 +          } else {
 +            if (splittable) {
 +
 +              long minSize = Math.max(getMinSplitSize(), 1);
 +
 +              long blockSize = file.getBlockSize(); // s3n rest api contained 
block size but blockLocations is one
 +              long splitSize = Math.max(minSize, blockSize);
 +              long bytesRemaining = length;
 +
 +              // for s3
 +              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
 +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
 +                splits.add(makeSplit(tableName, path, length - 
bytesRemaining, splitSize,
 +                    blkLocations[blkIndex].getHosts()));
 +                bytesRemaining -= splitSize;
 +              }
 +              if (bytesRemaining > 0) {
 +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
 +                splits.add(makeSplit(tableName, path, length - 
bytesRemaining, bytesRemaining,
 +                    blkLocations[blkIndex].getHosts()));
 +              }
 +            } else { // Non splittable
 +              splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
 +            }
 +          }
 +        } else {
 +          //for zero length files
 +          splits.add(makeSplit(tableName, path, 0, length));
 +        }
 +      }
 +      if(LOG.isDebugEnabled()){
 +        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
 +      }
 +    }
 +
 +    // Combine original fileFragments with new VolumeId information
 +    setVolumeMeta(volumeSplits, blockLocations);
 +    splits.addAll(volumeSplits);
 +    LOG.info("Total # of splits: " + splits.size());
 +    return splits;
 +  }
 +
 +  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> 
blockLocations)
 +      throws IOException {
 +
 +    int locationSize = blockLocations.size();
 +    int splitSize = splits.size();
 +    if (locationSize == 0 || splitSize == 0) return;
 +
 +    if (locationSize != splitSize) {
 +      // splits and locations don't match up
 +      LOG.warn("Number of block locations not equal to number of splits: "
 +          + "#locations=" + locationSize
 +          + " #splits=" + splitSize);
 +      return;
 +    }
 +
 +    DistributedFileSystem fs = 
(DistributedFileSystem)DistributedFileSystem.get(conf);
 +    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, 
DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
 +    int blockLocationIdx = 0;
 +
 +    Iterator<Fragment> iter = splits.iterator();
 +    while (locationSize > blockLocationIdx) {
 +
 +      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
 +      List<BlockLocation> locations = 
blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
 +      //BlockStorageLocation containing additional volume location 
information for each replica of each block.
 +      BlockStorageLocation[] blockStorageLocations = 
fs.getFileBlockStorageLocations(locations);
 +
 +      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) 
{
 +        
((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
 +        blockLocationIdx++;
 +      }
 +    }
 +    LOG.info("# of splits with volumeId " + splitSize);
 +  }
 +
 +  private static class InvalidInputException extends IOException {
 +    List<IOException> errors;
 +    public InvalidInputException(List<IOException> errors) {
 +      this.errors = errors;
 +    }
 +
 +    @Override
 +    public String getMessage(){
 +      StringBuffer sb = new StringBuffer();
 +      int messageLimit = Math.min(errors.size(), 10);
 +      for (int i = 0; i < messageLimit ; i ++) {
 +        sb.append(errors.get(i).getMessage()).append("\n");
 +      }
 +
 +      if(messageLimit < errors.size())
 +        sb.append("skipped .....").append("\n");
 +
 +      return sb.toString();
 +    }
 +  }
 +
 +  @Override
 +  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode 
scanNode) throws IOException {
 +    return getSplits(tableName, table.getMeta(), table.getSchema(), new 
Path(table.getPath()));
 +  }
 +
 +  @Override
 +  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws 
IOException {
 +    if (!tableDesc.isExternal()) {
 +      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
 +      String databaseName = splitted[0];
 +      String simpleTableName = splitted[1];
 +
 +      // create a table directory (i.e., 
${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
 +      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, 
simpleTableName);
 +      tableDesc.setPath(tablePath.toUri());
 +    } else {
 +      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION 
must be given.");
 +    }
 +
 +    Path path = new Path(tableDesc.getPath());
 +
 +    FileSystem fs = path.getFileSystem(conf);
 +    TableStats stats = new TableStats();
 +    if (tableDesc.isExternal()) {
 +      if (!fs.exists(path)) {
 +        LOG.error(path.toUri() + " does not exist");
 +        throw new IOException("ERROR: " + path.toUri() + " does not exist");
 +      }
 +    } else {
 +      fs.mkdirs(path);
 +    }
 +
 +    long totalSize = 0;
 +
 +    try {
 +      totalSize = calculateSize(path);
 +    } catch (IOException e) {
 +      LOG.warn("Cannot calculate the size of the relation", e);
 +    }
 +
 +    stats.setNumBytes(totalSize);
 +
 +    if (tableDesc.isExternal()) { // if it is an external table, there is no 
way to know the exact row number without processing.
 +      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
 +    }
 +
 +    tableDesc.setStats(stats);
 +  }
 +
 +  @Override
 +  public void purgeTable(TableDesc tableDesc) throws IOException {
 +    try {
 +      Path path = new Path(tableDesc.getPath());
 +      FileSystem fs = path.getFileSystem(conf);
 +      LOG.info("Delete table data dir: " + path);
 +      fs.delete(path, true);
 +    } catch (IOException e) {
 +      throw new InternalError(e.getMessage());
 +    }
 +  }
 +
 +  @Override
-   public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numFragments) throws IOException {
++  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numResultFragments) throws IOException {
 +    // Listing table data file which is not empty.
 +    // If the table is a partitioned table, return file list which has same 
partition key.
 +    Path tablePath = new Path(tableDesc.getPath());
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +
++    //In the case of partitioned table, we should return same partition key 
data files.
++    int partitionDepth = 0;
++    if (tableDesc.hasPartition()) {
++      partitionDepth = 
tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
++    }
++
 +    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
 +    if (fs.exists(tablePath)) {
-       getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, 
currentPage, numFragments,
-           new AtomicInteger(0));
++      getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, 
currentPage, numResultFragments,
++          new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
 +    }
 +
 +    List<Fragment> fragments = new ArrayList<Fragment>();
 +
-     //In the case of partitioned table, return same partition key data files.
-     int numPartitionColumns = 0;
-     if (tableDesc.hasPartition()) {
-       numPartitionColumns = 
tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
-     }
 +    String[] previousPartitionPathNames = null;
 +    for (FileStatus eachFile: nonZeroLengthFiles) {
 +      FileFragment fileFragment = new FileFragment(tableDesc.getName(), 
eachFile.getPath(), 0, eachFile.getLen(), null);
 +
-       if (numPartitionColumns > 0) {
++      if (partitionDepth > 0) {
 +        // finding partition key;
 +        Path filePath = fileFragment.getPath();
 +        Path parentPath = filePath;
-         String[] parentPathNames = new String[numPartitionColumns];
-         for (int i = 0; i < numPartitionColumns; i++) {
++        String[] parentPathNames = new String[partitionDepth];
++        for (int i = 0; i < partitionDepth; i++) {
 +          parentPath = parentPath.getParent();
-           parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
++          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
 +        }
 +
 +        // If current partitionKey == previousPartitionKey, add to result.
 +        if (previousPartitionPathNames == null) {
 +          fragments.add(fileFragment);
 +        } else if (previousPartitionPathNames != null && 
Arrays.equals(previousPartitionPathNames, parentPathNames)) {
 +          fragments.add(fileFragment);
 +        } else {
 +          break;
 +        }
 +        previousPartitionPathNames = parentPathNames;
 +      } else {
 +        fragments.add(fileFragment);
 +      }
 +    }
 +
 +    return fragments;
 +  }
 +
++  /**
++   *
++   * @param fs
++   * @param path The table path
++   * @param result The final result files to be used
++   * @param startFileIndex
++   * @param numResultFiles
++   * @param currentFileIndex
++   * @param partitioned A flag to indicate if this table is partitioned
++   * @param currentDepth Current visiting depth of partition directories
++   * @param maxDepth The partition depth of this table
++   * @throws IOException
++   */
 +  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, 
List<FileStatus> result,
 +                                                int startFileIndex, int 
numResultFiles,
-                                                 AtomicInteger 
currentFileIndex) throws IOException {
++                                                AtomicInteger 
currentFileIndex, boolean partitioned,
++                                                int currentDepth, int 
maxDepth) throws IOException {
++    // Intermediate directory
 +    if (fs.isDirectory(path)) {
-       FileStatus[] files = fs.listStatus(path, 
FileStorageManager.hiddenFileFilter);
++
++      FileStatus[] files = fs.listStatus(path, 
StorageManager.hiddenFileFilter);
++
 +      if (files != null && files.length > 0) {
++
 +        for (FileStatus eachFile : files) {
++
++          // checking if the enough number of files are found
 +          if (result.size() >= numResultFiles) {
 +            return;
 +          }
 +          if (eachFile.isDirectory()) {
-             getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, 
startFileIndex, numResultFiles,
-                 currentFileIndex);
-           } else if (eachFile.isFile() && eachFile.getLen() > 0) {
++
++            getNonZeroLengthDataFiles(
++                fs,
++                eachFile.getPath(),
++                result,
++                startFileIndex,
++                numResultFiles,
++                currentFileIndex,
++                partitioned,
++                currentDepth + 1, // increment a visiting depth
++                maxDepth);
++
++            // if partitioned table, we should ignore files located in the 
intermediate directory.
++            // we can ensure that this file is in leaf directory if 
currentDepth == maxDepth.
++          } else if (eachFile.isFile() && eachFile.getLen() > 0 && 
(!partitioned || currentDepth == maxDepth)) {
 +            if (currentFileIndex.get() >= startFileIndex) {
 +              result.add(eachFile);
 +            }
 +            currentFileIndex.incrementAndGet();
 +          }
 +        }
 +      }
++
++      // Files located in leaf directory
 +    } else {
 +      FileStatus fileStatus = fs.getFileStatus(path);
 +      if (fileStatus != null && fileStatus.getLen() > 0) {
 +        if (currentFileIndex.get() >= startFileIndex) {
 +          result.add(fileStatus);
 +        }
 +        currentFileIndex.incrementAndGet();
 +        if (result.size() >= numResultFiles) {
 +          return;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public StorageProperty getStorageProperty() {
 +    StorageProperty storageProperty = new StorageProperty();
 +    storageProperty.setSortedInsert(false);
 +    if (storeType == StoreType.RAW) {
 +      storageProperty.setSupportsInsertInto(false);
 +    } else {
 +      storageProperty.setSupportsInsertInto(true);
 +    }
 +
 +    return storageProperty;
 +  }
 +
 +  @Override
 +  public void closeStorageManager() {
 +  }
 +
 +  @Override
 +  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
 +  }
 +
 +  @Override
 +  public void rollbackOutputCommit(LogicalNode node) throws IOException {
 +  }
 +
 +  @Override
 +  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, 
TableDesc tableDesc,
 +                                          Schema inputSchema, SortSpec[] 
sortSpecs, TupleRange dataRange)
 +      throws IOException {
 +    return null;
 +  }
 +
 +  /**
 +   * Returns Scanner instance.
 +   *
 +   * @param conf The system property
 +   * @param meta The table meta
 +   * @param schema The input schema
 +   * @param path The data file path
 +   * @return Scanner instance
 +   * @throws java.io.IOException
 +   */
 +  public static synchronized SeekableScanner getSeekableScanner(
 +      TajoConf conf, TableMeta meta, Schema schema, Path path) throws 
IOException {
 +
 +    FileSystem fs = path.getFileSystem(conf);
 +    FileStatus status = fs.getFileStatus(path);
 +    FileFragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
 +
 +    return getSeekableScanner(conf, meta, schema, fragment, schema);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
index 0000000,0000000..c1f63a8
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@@ -1,0 -1,0 +1,220 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++
++import io.netty.buffer.ByteBuf;
++import net.minidev.json.JSONArray;
++import net.minidev.json.JSONObject;
++import net.minidev.json.parser.JSONParser;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.SchemaUtil;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.common.exception.NotImplementedException;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.Tuple;
++import org.apache.tajo.storage.text.TextLineDeserializer;
++import org.apache.tajo.storage.text.TextLineParsingError;
++
++import java.io.IOException;
++import java.util.Iterator;
++
++public class JsonLineDeserializer extends TextLineDeserializer {
++  private JSONParser parser;
++  private Type [] types;
++  private String [] columnNames;
++
++  public JsonLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
++    super(schema, meta, targetColumnIndexes);
++  }
++
++  @Override
++  public void init() {
++    types = SchemaUtil.toTypes(schema);
++    columnNames = SchemaUtil.toSimpleNames(schema);
++
++    parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
++  }
++
++  @Override
++  public void deserialize(ByteBuf buf, Tuple output) throws IOException, 
TextLineParsingError {
++    byte [] line = new byte[buf.readableBytes()];
++    buf.readBytes(line);
++
++    try {
++      JSONObject object = (JSONObject) parser.parse(line);
++
++      for (int i = 0; i < targetColumnIndexes.length; i++) {
++        int actualIdx = targetColumnIndexes[i];
++        String fieldName = columnNames[actualIdx];
++
++        if (!object.containsKey(fieldName)) {
++          output.put(actualIdx, NullDatum.get());
++          continue;
++        }
++
++        switch (types[actualIdx]) {
++        case BOOLEAN:
++          String boolStr = object.getAsString(fieldName);
++          if (boolStr != null) {
++            output.put(actualIdx, 
DatumFactory.createBool(boolStr.equals("true")));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case CHAR:
++          String charStr = object.getAsString(fieldName);
++          if (charStr != null) {
++            output.put(actualIdx, DatumFactory.createChar(charStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case INT1:
++        case INT2:
++          Number int2Num = object.getAsNumber(fieldName);
++          if (int2Num != null) {
++            output.put(actualIdx, 
DatumFactory.createInt2(int2Num.shortValue()));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case INT4:
++          Number int4Num = object.getAsNumber(fieldName);
++          if (int4Num != null) {
++            output.put(actualIdx, 
DatumFactory.createInt4(int4Num.intValue()));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case INT8:
++          Number int8Num = object.getAsNumber(fieldName);
++          if (int8Num != null) {
++            output.put(actualIdx, 
DatumFactory.createInt8(int8Num.longValue()));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case FLOAT4:
++          Number float4Num = object.getAsNumber(fieldName);
++          if (float4Num != null) {
++            output.put(actualIdx, 
DatumFactory.createFloat4(float4Num.floatValue()));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case FLOAT8:
++          Number float8Num = object.getAsNumber(fieldName);
++          if (float8Num != null) {
++            output.put(actualIdx, 
DatumFactory.createFloat8(float8Num.doubleValue()));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case TEXT:
++          String textStr = object.getAsString(fieldName);
++          if (textStr != null) {
++            output.put(actualIdx, DatumFactory.createText(textStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case TIMESTAMP:
++          String timestampStr = object.getAsString(fieldName);
++          if (timestampStr != null) {
++            output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case TIME:
++          String timeStr = object.getAsString(fieldName);
++          if (timeStr != null) {
++            output.put(actualIdx, DatumFactory.createTime(timeStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case DATE:
++          String dateStr = object.getAsString(fieldName);
++          if (dateStr != null) {
++            output.put(actualIdx, DatumFactory.createDate(dateStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++        case BIT:
++        case BINARY:
++        case VARBINARY:
++        case BLOB: {
++          Object jsonObject = object.get(fieldName);
++
++          if (jsonObject == null) {
++            output.put(actualIdx, NullDatum.get());
++            break;
++          } if (jsonObject instanceof String) {
++            output.put(actualIdx, 
DatumFactory.createBlob((String)jsonObject));
++          } else if (jsonObject instanceof JSONArray) {
++            JSONArray jsonArray = (JSONArray) jsonObject;
++            byte[] bytes = new byte[jsonArray.size()];
++            Iterator<Object> it = jsonArray.iterator();
++            int arrayIdx = 0;
++            while (it.hasNext()) {
++              bytes[arrayIdx++] = ((Long) it.next()).byteValue();
++            }
++            if (bytes.length > 0) {
++              output.put(actualIdx, DatumFactory.createBlob(bytes));
++            } else {
++              output.put(actualIdx, NullDatum.get());
++            }
++            break;
++          } else {
++            throw new IOException("Unknown json object: " + 
object.getClass().getSimpleName());
++          }
++          break;
++        }
++        case INET4:
++          String inetStr = object.getAsString(fieldName);
++          if (inetStr != null) {
++            output.put(actualIdx, DatumFactory.createInet4(inetStr));
++          } else {
++            output.put(actualIdx, NullDatum.get());
++          }
++          break;
++
++        case NULL_TYPE:
++          output.put(actualIdx, NullDatum.get());
++          break;
++
++        default:
++          throw new NotImplementedException(types[actualIdx].name() + " is 
not supported.");
++        }
++      }
++
++    } catch (Throwable e) {
++      throw new IOException(e);
++    }
++  }
++
++  @Override
++  public void release() {
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
index 0000000,0000000..6db2c29
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
@@@ -1,0 -1,0 +1,37 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.text.TextLineDeserializer;
++import org.apache.tajo.storage.text.TextLineSerDe;
++import org.apache.tajo.storage.text.TextLineSerializer;
++
++public class JsonLineSerDe extends TextLineSerDe {
++  @Override
++  public TextLineDeserializer createDeserializer(Schema schema, TableMeta 
meta, int[] targetColumnIndexes) {
++    return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
++  }
++
++  @Override
++  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
++    return new JsonLineSerializer(schema, meta);
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
index 0000000,0000000..cd31ada
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@@ -1,0 -1,0 +1,130 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++
++import net.minidev.json.JSONObject;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.SchemaUtil;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.common.exception.NotImplementedException;
++import org.apache.tajo.datum.TextDatum;
++import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
++import org.apache.tajo.storage.Tuple;
++import org.apache.tajo.storage.text.TextLineSerializer;
++
++import java.io.IOException;
++import java.io.OutputStream;
++
++public class JsonLineSerializer extends TextLineSerializer {
++  private static ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
++
++  private Type [] types;
++  private String [] simpleNames;
++  private int columnNum;
++
++
++  public JsonLineSerializer(Schema schema, TableMeta meta) {
++    super(schema, meta);
++  }
++
++  @Override
++  public void init() {
++    types = SchemaUtil.toTypes(schema);
++    simpleNames = SchemaUtil.toSimpleNames(schema);
++    columnNum = schema.size();
++  }
++
++  @Override
++  public int serialize(OutputStream out, Tuple input) throws IOException {
++    JSONObject jsonObject = new JSONObject();
++
++    for (int i = 0; i < columnNum; i++) {
++      if (input.isNull(i)) {
++        continue;
++      }
++
++      String fieldName = simpleNames[i];
++      Type type = types[i];
++
++      switch (type) {
++
++      case BOOLEAN:
++        jsonObject.put(fieldName, input.getBool(i));
++        break;
++
++      case INT1:
++      case INT2:
++        jsonObject.put(fieldName, input.getInt2(i));
++        break;
++
++      case INT4:
++        jsonObject.put(fieldName, input.getInt4(i));
++        break;
++
++      case INT8:
++        jsonObject.put(fieldName, input.getInt8(i));
++        break;
++
++      case FLOAT4:
++        jsonObject.put(fieldName, input.getFloat4(i));
++        break;
++
++      case FLOAT8:
++        jsonObject.put(fieldName, input.getFloat8(i));
++        break;
++
++      case CHAR:
++      case TEXT:
++      case VARCHAR:
++      case INET4:
++      case TIMESTAMP:
++      case DATE:
++      case TIME:
++      case INTERVAL:
++        jsonObject.put(fieldName, input.getText(i));
++        break;
++
++      case BIT:
++      case BINARY:
++      case BLOB:
++      case VARBINARY:
++        jsonObject.put(fieldName, input.getBytes(i));
++        break;
++
++      case NULL_TYPE:
++        break;
++
++      default:
++        throw new NotImplementedException(types[i].name() + " is not 
supported.");
++      }
++    }
++
++    String jsonStr = jsonObject.toJSONString();
++    byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET);
++    out.write(jsonBytes);
++    return jsonBytes.length;
++  }
++
++  @Override
++  public void release() {
++
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 1448885,0000000..86319e1
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@@ -1,154 -1,0 +1,170 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.text;
 +
 +import io.netty.buffer.ByteBuf;
 +import io.netty.util.CharsetUtil;
 +import org.apache.tajo.storage.BufferPool;
 +import org.apache.tajo.storage.ByteBufInputChannel;
 +
 +import java.io.Closeable;
 +import java.io.IOException;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +public class ByteBufLineReader implements Closeable {
 +  private static int DEFAULT_BUFFER = 64 * 1024;
 +
 +  private int bufferSize;
 +  private long readBytes;
++  private boolean eof = false;
 +  private ByteBuf buffer;
 +  private final ByteBufInputChannel channel;
 +  private final AtomicInteger tempReadBytes = new AtomicInteger();
 +  private final LineSplitProcessor processor = new LineSplitProcessor();
 +
 +  public ByteBufLineReader(ByteBufInputChannel channel) {
 +    this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
 +  }
 +
 +  public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
 +    this.readBytes = 0;
 +    this.channel = channel;
 +    this.buffer = buf;
 +    this.bufferSize = buf.capacity();
 +  }
 +
 +  public long readBytes() {
 +    return readBytes - buffer.readableBytes();
 +  }
 +
 +  public long available() throws IOException {
 +    return channel.available() + buffer.readableBytes();
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    if (this.buffer.refCnt() > 0) {
 +      this.buffer.release();
 +    }
 +    this.channel.close();
 +  }
 +
 +  public String readLine() throws IOException {
 +    ByteBuf buf = readLineBuf(tempReadBytes);
 +    if (buf != null) {
 +      return buf.toString(CharsetUtil.UTF_8);
 +    }
 +    return null;
 +  }
 +
 +  private void fillBuffer() throws IOException {
 +
 +    int tailBytes = 0;
 +    if (this.readBytes > 0) {
 +      this.buffer.markReaderIndex();
 +      this.buffer.discardSomeReadBytes();  // compact the buffer
 +      tailBytes = this.buffer.writerIndex();
 +      if (!this.buffer.isWritable()) {
 +        // a line bytes is large than the buffer
 +        BufferPool.ensureWritable(buffer, bufferSize);
 +        this.bufferSize = buffer.capacity();
 +      }
 +    }
 +
 +    boolean release = true;
 +    try {
 +      int readBytes = tailBytes;
 +      for (; ; ) {
 +        int localReadBytes = buffer.writeBytes(channel, bufferSize - 
readBytes);
 +        if (localReadBytes < 0) {
++          if (tailBytes == readBytes) {
++            // no more bytes are in the channel
++            eof = true;
++          }
 +          break;
 +        }
 +        readBytes += localReadBytes;
 +        if (readBytes == bufferSize) {
 +          break;
 +        }
 +      }
 +      this.readBytes += (readBytes - tailBytes);
 +      release = false;
-       this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip 
past buffer (tail)
++      if (!eof) {
++        this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); 
//skip past buffer (tail)
++      }
 +    } finally {
 +      if (release) {
 +        buffer.release();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Read a line terminated by one of CR, LF, or CRLF.
 +   */
 +  public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
++    if(eof) return null;
++
 +    int startIndex = buffer.readerIndex();
 +    int readBytes;
 +    int readable;
 +    int newlineLength; //length of terminating newline
 +
 +    loop:
 +    while (true) {
 +      readable = buffer.readableBytes();
 +      if (readable <= 0) {
 +        buffer.readerIndex(startIndex);
 +        fillBuffer(); //compact and fill buffer
 +        if (!buffer.isReadable()) {
 +          return null;
 +        } else {
-           startIndex = 0; // reset the line start position
++          if (!eof) startIndex = 0; // reset the line start position
++          else startIndex = buffer.readerIndex();
 +        }
 +        readable = buffer.readableBytes();
 +      }
 +
 +      int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, 
processor);
 +      if (endIndex < 0) {
-         buffer.readerIndex(buffer.writerIndex());
++        //does not appeared terminating newline
++        buffer.readerIndex(buffer.writerIndex()); // set to end buffer
++        if(eof){
++          readBytes = buffer.readerIndex() - startIndex;
++          newlineLength = 0;
++          break loop;
++        }
 +      } else {
 +        buffer.readerIndex(endIndex + 1);
 +        readBytes = buffer.readerIndex() - startIndex;
 +        if (processor.isPrevCharCR() && buffer.isReadable()
 +            && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) 
{
 +          buffer.skipBytes(1);
 +          newlineLength = 2;
 +        } else {
 +          newlineLength = 1;
 +        }
 +        break loop;
 +      }
 +    }
 +    reads.set(readBytes);
 +    return buffer.slice(startIndex, readBytes - newlineLength);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 0000000,0000000..f2eebc6
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@@ -1,0 -1,0 +1,96 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.storage.FieldSerializerDeserializer;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++
++public class CSVLineDeserializer extends TextLineDeserializer {
++  private FieldSplitProcessor processor;
++  private FieldSerializerDeserializer fieldSerDer;
++  private ByteBuf nullChars;
++
++  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
++    super(schema, meta, targetColumnIndexes);
++  }
++
++  @Override
++  public void init() {
++    this.processor = new 
FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
++
++    if (nullChars != null) {
++      nullChars.release();
++    }
++    nullChars = TextLineSerDe.getNullChars(meta);
++
++    fieldSerDer = new TextFieldSerializerDeserializer();
++  }
++
++  public void deserialize(final ByteBuf lineBuf, Tuple output) throws 
IOException, TextLineParsingError {
++    int[] projection = targetColumnIndexes;
++    if (lineBuf == null || targetColumnIndexes == null || 
targetColumnIndexes.length == 0) {
++      return;
++    }
++
++    final int rowLength = lineBuf.readableBytes();
++    int start = 0, fieldLength = 0, end = 0;
++
++    //Projection
++    int currentTarget = 0;
++    int currentIndex = 0;
++
++    while (end != -1) {
++      end = lineBuf.forEachByte(start, rowLength - start, processor);
++
++      if (end < 0) {
++        fieldLength = rowLength - start;
++      } else {
++        fieldLength = end - start;
++      }
++
++      if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
++        lineBuf.setIndex(start, start + fieldLength);
++        Datum datum = fieldSerDer.deserialize(lineBuf, 
schema.getColumn(currentIndex), currentIndex, nullChars);
++        output.put(currentIndex, datum);
++        currentTarget++;
++      }
++
++      if (projection.length == currentTarget) {
++        break;
++      }
++
++      start = end + 1;
++      currentIndex++;
++    }
++  }
++
++  @Override
++  public void release() {
++    if (nullChars != null) {
++      nullChars.release();
++      nullChars = null;
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index 0000000,0000000..2fe7f23
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@@ -1,0 -1,0 +1,41 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import org.apache.commons.lang.StringEscapeUtils;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.StorageConstants;
++
++public class CSVLineSerDe extends TextLineSerDe {
++  @Override
++  public TextLineDeserializer createDeserializer(Schema schema, TableMeta 
meta, int[] targetColumnIndexes) {
++    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
++  }
++
++  @Override
++  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
++    return new CSVLineSerializer(schema, meta);
++  }
++
++  public static char getFieldDelimiter(TableMeta meta) {
++    return 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
++        StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 0000000,0000000..48154eb
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@@ -1,0 -1,0 +1,70 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.storage.FieldSerializerDeserializer;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++import java.io.OutputStream;
++
++public class CSVLineSerializer extends TextLineSerializer {
++  private FieldSerializerDeserializer serde;
++
++  private byte [] nullChars;
++  private char delimiter;
++  private int columnNum;
++
++  public CSVLineSerializer(Schema schema, TableMeta meta) {
++    super(schema, meta);
++  }
++
++  @Override
++  public void init() {
++    nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
++    delimiter = CSVLineSerDe.getFieldDelimiter(meta);
++    columnNum = schema.size();
++
++    serde = new TextFieldSerializerDeserializer();
++  }
++
++  @Override
++  public int serialize(OutputStream out, Tuple input) throws IOException {
++    int writtenBytes = 0;
++
++    for (int i = 0; i < columnNum; i++) {
++      Datum datum = input.get(i);
++      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, 
nullChars);
++
++      if (columnNum - 1 > i) {
++        out.write((byte) delimiter);
++        writtenBytes += 1;
++      }
++    }
++
++    return writtenBytes;
++  }
++
++  @Override
++  public void release() {
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index a337509,0000000..7848198
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@@ -1,468 -1,0 +1,475 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.text;
 +
 +import io.netty.buffer.ByteBuf;
- import org.apache.commons.lang.StringEscapeUtils;
- import org.apache.commons.lang.StringUtils;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.compress.CompressionCodec;
 +import org.apache.hadoop.io.compress.CompressionCodecFactory;
 +import org.apache.hadoop.io.compress.CompressionOutputStream;
 +import org.apache.hadoop.io.compress.Compressor;
 +import org.apache.tajo.QueryUnitAttemptId;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.statistics.TableStats;
- import org.apache.tajo.datum.Datum;
- import org.apache.tajo.datum.NullDatum;
 +import org.apache.tajo.storage.*;
 +import org.apache.tajo.storage.compress.CodecPool;
 +import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
++import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
 +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
++import org.apache.tajo.util.ReflectionUtil;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Arrays;
++import java.util.Map;
++import java.util.concurrent.ConcurrentHashMap;
++
++import static 
org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
++import static 
org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
 +
 +public class DelimitedTextFile {
 +
 +  public static final byte LF = '\n';
-   public static int EOF = -1;
 +
 +  private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
 +
++  /** it caches line serde classes. */
++  private static final Map<String, Class<? extends TextLineSerDe>> 
serdeClassCache =
++      new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
++
++  /**
++   * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table 
property 'text.serde.class' is given,
++   * it will use the specified serder class.
++   *
++   * @return TextLineSerder
++   */
++  public static TextLineSerDe getLineSerde(TableMeta meta) {
++    TextLineSerDe lineSerder;
++
++    String serDeClassName;
++
++    // if there is no given serde class, it will use CSV line serder.
++    serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, 
StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
++
++    try {
++      Class<? extends TextLineSerDe> serdeClass;
++
++      if (serdeClassCache.containsKey(serDeClassName)) {
++        serdeClass = serdeClassCache.get(serDeClassName);
++      } else {
++        serdeClass = (Class<? extends TextLineSerDe>) 
Class.forName(serDeClassName);
++        serdeClassCache.put(serDeClassName, serdeClass);
++      }
++      lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
++    } catch (Throwable e) {
++      throw new RuntimeException("TextLineSerde class cannot be 
initialized.", e);
++    }
++
++    return lineSerder;
++  }
++
 +  public static class DelimitedTextFileAppender extends FileAppender {
 +    private final TableMeta meta;
 +    private final Schema schema;
-     private final int columnNum;
 +    private final FileSystem fs;
 +    private FSDataOutputStream fos;
 +    private DataOutputStream outputStream;
 +    private CompressionOutputStream deflateFilter;
-     private char delimiter;
 +    private TableStatistics stats = null;
 +    private Compressor compressor;
 +    private CompressionCodecFactory codecFactory;
 +    private CompressionCodec codec;
 +    private Path compressedPath;
 +    private byte[] nullChars;
 +    private int BUFFER_SIZE = 128 * 1024;
 +    private int bufferedBytes = 0;
 +    private long pos = 0;
 +
 +    private NonSyncByteArrayOutputStream os;
-     private FieldSerializerDeserializer serde;
++    private TextLineSerializer serializer;
 +
 +    public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId 
taskAttemptId,
 +                                     final Schema schema, final TableMeta 
meta, final Path path)
 +        throws IOException {
 +      super(conf, taskAttemptId, schema, meta, path);
 +      this.fs = path.getFileSystem(conf);
 +      this.meta = meta;
 +      this.schema = schema;
-       this.delimiter = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
-           StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-       this.columnNum = schema.size();
- 
-       String nullCharacters = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
-           NullDatum.DEFAULT_TEXT));
-       if (StringUtils.isEmpty(nullCharacters)) {
-         nullChars = NullDatum.get().asTextBytes();
-       } else {
-         nullChars = nullCharacters.getBytes();
-       }
++    }
++
++    public TextLineSerDe getLineSerde() {
++      return DelimitedTextFile.getLineSerde(meta);
 +    }
 +
 +    @Override
 +    public void init() throws IOException {
 +      if (!fs.exists(path.getParent())) {
 +        throw new FileNotFoundException(path.toString());
 +      }
 +
 +      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
 +        String codecName = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
 +        codecFactory = new CompressionCodecFactory(conf);
 +        codec = codecFactory.getCodecByClassName(codecName);
 +        compressor = CodecPool.getCompressor(codec);
 +        if (compressor != null) compressor.reset();  //builtin gzip is null
 +
 +        String extension = codec.getDefaultExtension();
 +        compressedPath = path.suffix(extension);
 +
 +        if (fs.exists(compressedPath)) {
 +          throw new AlreadyExistsStorageException(compressedPath);
 +        }
 +
 +        fos = fs.create(compressedPath);
 +        deflateFilter = codec.createOutputStream(fos, compressor);
 +        outputStream = new DataOutputStream(deflateFilter);
 +
 +      } else {
 +        if (fs.exists(path)) {
 +          throw new AlreadyExistsStorageException(path);
 +        }
 +        fos = fs.create(path);
 +        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
 +      }
 +
 +      if (enabledStats) {
 +        this.stats = new TableStatistics(this.schema);
 +      }
 +
-       serde = new TextFieldSerializerDeserializer();
++      serializer = getLineSerde().createSerializer(schema, meta);
++      serializer.init();
 +
 +      if (os == null) {
 +        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
 +      }
 +
 +      os.reset();
 +      pos = fos.getPos();
 +      bufferedBytes = 0;
 +      super.init();
 +    }
 +
- 
 +    @Override
 +    public void addTuple(Tuple tuple) throws IOException {
-       Datum datum;
-       int rowBytes = 0;
- 
-       for (int i = 0; i < columnNum; i++) {
-         datum = tuple.get(i);
-         rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, 
nullChars);
++      // write
++      int rowBytes = serializer.serialize(os, tuple);
 +
-         if (columnNum - 1 > i) {
-           os.write((byte) delimiter);
-           rowBytes += 1;
-         }
-       }
++      // new line
 +      os.write(LF);
 +      rowBytes += 1;
 +
++      // update positions
 +      pos += rowBytes;
 +      bufferedBytes += rowBytes;
++
++      // refill buffer if necessary
 +      if (bufferedBytes > BUFFER_SIZE) {
 +        flushBuffer();
 +      }
 +      // Statistical section
 +      if (enabledStats) {
 +        stats.incrementRow();
 +      }
 +    }
 +
 +    private void flushBuffer() throws IOException {
 +      if (os.getLength() > 0) {
 +        os.writeTo(outputStream);
 +        os.reset();
 +        bufferedBytes = 0;
 +      }
 +    }
 +
 +    @Override
 +    public long getOffset() throws IOException {
 +      return pos;
 +    }
 +
 +    @Override
 +    public void flush() throws IOException {
 +      flushBuffer();
 +      outputStream.flush();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +
 +      try {
++        serializer.release();
++
 +        if(outputStream != null){
 +          flush();
 +        }
 +
 +        // Statistical section
 +        if (enabledStats) {
 +          stats.setNumBytes(getOffset());
 +        }
 +
 +        if (deflateFilter != null) {
 +          deflateFilter.finish();
 +          deflateFilter.resetState();
 +          deflateFilter = null;
 +        }
 +
 +        os.close();
 +      } finally {
 +        IOUtils.cleanup(LOG, fos);
 +        if (compressor != null) {
 +          CodecPool.returnCompressor(compressor);
 +          compressor = null;
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TableStats getStats() {
 +      if (enabledStats) {
 +        return stats.getTableStat();
 +      } else {
 +        return null;
 +      }
 +    }
 +
 +    public boolean isCompress() {
 +      return compressor != null;
 +    }
 +
 +    public String getExtension() {
 +      return codec != null ? codec.getDefaultExtension() : "";
 +    }
 +  }
 +
 +  public static class DelimitedTextFileScanner extends FileScanner {
- 
 +    private boolean splittable = false;
 +    private final long startOffset;
-     private final long endOffset;
 +
++    private final long endOffset;
++    /** The number of actual read records */
 +    private int recordCount = 0;
 +    private int[] targetColumnIndexes;
 +
-     private ByteBuf nullChars;
-     private FieldSerializerDeserializer serde;
 +    private DelimitedLineReader reader;
-     private FieldSplitProcessor processor;
++    private TextLineDeserializer deserializer;
++
++    private int errorPrintOutMaxNum = 5;
++    /** Maximum number of permissible errors */
++    private int errorTorrenceMaxNum;
++    /** How many errors have occurred? */
++    private int errorNum;
 +
 +    public DelimitedTextFileScanner(Configuration conf, final Schema schema, 
final TableMeta meta,
 +                                    final Fragment fragment)
 +        throws IOException {
 +      super(conf, schema, meta, fragment);
 +      reader = new DelimitedLineReader(conf, this.fragment);
 +      if (!reader.isCompressed()) {
 +        splittable = true;
 +      }
 +
 +      startOffset = this.fragment.getStartKey();
 +      endOffset = startOffset + fragment.getLength();
 +
-       //Delimiter
-       String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
-       this.processor = new 
FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
++      errorTorrenceMaxNum =
++          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, 
DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
 +    }
 +
++
 +    @Override
 +    public void init() throws IOException {
-       if (nullChars != null) {
-         nullChars.release();
-       }
- 
-       String nullCharacters = 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
-           NullDatum.DEFAULT_TEXT));
-       byte[] bytes;
-       if (StringUtils.isEmpty(nullCharacters)) {
-         bytes = NullDatum.get().asTextBytes();
-       } else {
-         bytes = nullCharacters.getBytes();
-       }
- 
-       nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
-       nullChars.writeBytes(bytes);
- 
 +      if (reader != null) {
 +        reader.close();
 +      }
 +      reader = new DelimitedLineReader(conf, fragment);
 +      reader.init();
 +      recordCount = 0;
 +
 +      if (targets == null) {
 +        targets = schema.toArray();
 +      }
 +
 +      targetColumnIndexes = new int[targets.length];
 +      for (int i = 0; i < targets.length; i++) {
 +        targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
 +      }
 +
-       serde = new TextFieldSerializerDeserializer();
- 
 +      super.init();
 +      Arrays.sort(targetColumnIndexes);
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," 
+ startOffset + "," + endOffset);
 +      }
 +
 +      if (startOffset > 0) {
 +        reader.readLine();  // skip first line;
 +      }
-     }
 +
-     public ByteBuf readLine() throws IOException {
-       ByteBuf buf = reader.readLine();
-       if (buf == null) {
-         return null;
-       } else {
-         recordCount++;
-       }
++      deserializer = getLineSerde().createDeserializer(schema, meta, 
targetColumnIndexes);
++      deserializer.init();
++    }
 +
-       return buf;
++    public TextLineSerDe getLineSerde() {
++      return DelimitedTextFile.getLineSerde(meta);
 +    }
 +
 +    @Override
 +    public float getProgress() {
 +      try {
 +        if (!reader.isReadable()) {
 +          return 1.0f;
 +        }
 +        long filePos = reader.getCompressedPosition();
 +        if (startOffset == filePos) {
 +          return 0.0f;
 +        } else {
 +          long readBytes = filePos - startOffset;
 +          long remainingBytes = Math.max(endOffset - filePos, 0);
 +          return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + 
remainingBytes));
 +        }
 +      } catch (IOException e) {
 +        LOG.error(e.getMessage(), e);
 +        return 0.0f;
 +      }
 +    }
 +
 +    @Override
 +    public Tuple next() throws IOException {
++
++      if (!reader.isReadable()) {
++        return null;
++      }
++
++      if (targets.length == 0) {
++        return EmptyTuple.get();
++      }
++
++      VTuple tuple = new VTuple(schema.size());
++
 +      try {
-         if (!reader.isReadable()) return null;
 +
-         ByteBuf buf = readLine();
-         if (buf == null) return null;
++        // this loop will continue until one tuple is build or EOS (end of 
stream).
++        do {
 +
-         if (targets.length == 0) {
-           return EmptyTuple.get();
-         }
++          ByteBuf buf = reader.readLine();
++          if (buf == null) {
++            return null;
++          }
 +
-         VTuple tuple = new VTuple(schema.size());
-         fillTuple(schema, tuple, buf, targetColumnIndexes);
-         return tuple;
-       } catch (Throwable t) {
-         LOG.error("Tuple list current index: " + recordCount + " file 
offset:" + reader.getCompressedPosition(), t);
-         throw new IOException(t);
-       }
-     }
++          try {
 +
-     private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] 
target) throws IOException {
-       int[] projection = target;
-       if (lineBuf == null || target == null || target.length == 0) {
-         return;
-       }
++            deserializer.deserialize(buf, tuple);
++            // if a line is read normaly, it exists this loop.
++            break;
 +
-       final int rowLength = lineBuf.readableBytes();
-       int start = 0, fieldLength = 0, end = 0;
++          } catch (TextLineParsingError tae) {
 +
-       //Projection
-       int currentTarget = 0;
-       int currentIndex = 0;
++            errorNum++;
 +
-       while (end != -1) {
-         end = lineBuf.forEachByte(start, rowLength - start, processor);
++            // suppress too many log prints, which probably cause performance 
degradation
++            if (errorNum < errorPrintOutMaxNum) {
++              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
++            }
 +
-         if (end < 0) {
-           fieldLength = rowLength - start;
-         } else {
-           fieldLength = end - start;
-         }
++            // Only when the maximum error torrence limit is set (i.e., 
errorTorrenceMaxNum >= 0),
++            // it checks if the number of parsing error exceeds the max limit.
++            // Otherwise, it will ignore all parsing errors.
++            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
++              throw tae;
++            }
++            continue;
++          }
 +
-         if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
-           lineBuf.setIndex(start, start + fieldLength);
-           Datum datum = serde.deserialize(lineBuf, 
schema.getColumn(currentIndex), currentIndex, nullChars);
-           dst.put(currentIndex, datum);
-           currentTarget++;
-         }
++        } while (reader.isReadable()); // continue until EOS
 +
-         if (projection.length == currentTarget) {
-           break;
-         }
++        // recordCount means the number of actual read records. We increment 
the count here.
++        recordCount++;
 +
-         start = end + 1;
-         currentIndex++;
++        return tuple;
++
++      } catch (Throwable t) {
++        LOG.error(t);
++        throw new IOException(t);
 +      }
 +    }
 +
 +    @Override
 +    public void reset() throws IOException {
 +      init();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +      try {
-         if (nullChars != null) {
-           nullChars.release();
-           nullChars = null;
++        if (deserializer != null) {
++          deserializer.release();
 +        }
 +
 +        if (tableStats != null && reader != null) {
 +          tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
 +          tableStats.setNumRows(recordCount);
 +        }
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("DelimitedTextFileScanner processed record:" + 
recordCount);
 +        }
 +      } finally {
 +        IOUtils.cleanup(LOG, reader);
 +        reader = null;
 +      }
 +    }
 +
 +    @Override
 +    public boolean isProjectable() {
 +      return true;
 +    }
 +
 +    @Override
 +    public boolean isSelectable() {
 +      return false;
 +    }
 +
 +    @Override
 +    public void setSearchCondition(Object expr) {
 +    }
 +
 +    @Override
 +    public boolean isSplittable() {
 +      return splittable;
 +    }
 +
 +    @Override
 +    public TableStats getInputStats() {
 +      if (tableStats != null && reader != null) {
 +        tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
 +        tableStats.setNumRows(recordCount);
 +        tableStats.setNumBytes(fragment.getLength());
 +      }
 +      return tableStats;
 +    }
 +  }
 +}

Reply via email to