Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into 
hbase_storage

Conflicts:
        tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
        
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
        tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
        
tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
        tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/03c3ea29
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/03c3ea29
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/03c3ea29

Branch: refs/heads/hbase_storage
Commit: 03c3ea2904e1e821b24e8e441b91ea7fed55df35
Parents: 85627a5 55084a8
Author: HyoungJun Kim <[email protected]>
Authored: Wed Nov 12 15:37:57 2014 +0900
Committer: HyoungJun Kim <[email protected]>
Committed: Wed Nov 12 15:37:57 2014 +0900

----------------------------------------------------------------------
 BUILDING                                        |   2 +-
 CHANGES                                         |  28 +
 README                                          |   2 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |  12 +-
 .../src/main/proto/CatalogProtos.proto          |   1 +
 .../tajo/catalog/store/HCatalogStore.java       |  16 +-
 .../apache/tajo/catalog/store/HCatalogUtil.java |   2 +-
 .../tajo/catalog/store/TestHCatalogStore.java   |  18 +-
 tajo-catalog/tajo-catalog-server/pom.xml        |   6 +
 .../tajo/catalog/store/AbstractDBStore.java     |  96 ++-
 .../apache/tajo/catalog/store/DerbyStore.java   | 330 +--------
 .../apache/tajo/catalog/store/OracleStore.java  | 337 +--------
 .../tajo/catalog/store/PostgreSQLStore.java     | 293 +-------
 .../catalog/store/XMLCatalogSchemaManager.java  | 698 +++++++++++++++++++
 .../tajo/catalog/store/object/BaseSchema.java   |  76 ++
 .../catalog/store/object/DatabaseObject.java    |  80 +++
 .../store/object/DatabaseObjectType.java        |  48 ++
 .../tajo/catalog/store/object/SQLObject.java    |  52 ++
 .../tajo/catalog/store/object/SchemaPatch.java  |  78 +++
 .../tajo/catalog/store/object/StoreObject.java  |  87 +++
 .../resources/schemas/DBMSSchemaDefinition.xsd  | 177 +++++
 .../main/resources/schemas/derby/columns.sql    |   8 -
 .../main/resources/schemas/derby/databases.sql  |   6 -
 .../resources/schemas/derby/databases_idx.sql   |   1 -
 .../src/main/resources/schemas/derby/derby.xml  | 186 +++++
 .../main/resources/schemas/derby/indexes.sql    |  12 -
 .../schemas/derby/partition_methods.sql         |   6 -
 .../main/resources/schemas/derby/partitions.sql |  10 -
 .../src/main/resources/schemas/derby/stats.sql  |   6 -
 .../schemas/derby/table_properties.sql          |   6 -
 .../src/main/resources/schemas/derby/tables.sql |  10 -
 .../resources/schemas/derby/tablespaces.sql     |   7 -
 .../main/resources/schemas/oracle/oracle.xml    | 218 ++++++
 .../resources/schemas/postgresql/indexes.sql    |  14 -
 .../resources/schemas/postgresql/postgresql.xml | 203 ++++++
 .../main/resources/schemas/postgresql/stats.sql |   6 -
 .../store/TestXMLCatalogSchemaManager.java      | 496 +++++++++++++
 .../schemas/derbytest/loadtest/derby.xml        | 191 +++++
 .../derbytest/mergetest/base_version_1.xml      |  35 +
 .../derbytest/mergetest/base_version_2.xml      |  63 ++
 .../schemas/derbytest/querytest/derby.xml       |  78 +++
 .../derbytest/upgradetest/base_version_2.xml    |  57 ++
 .../cli/tsql/DefaultTajoCliOutputFormatter.java |   1 -
 .../cli/tsql/commands/DescTableCommand.java     |   1 -
 .../org/apache/tajo/client/QueryClient.java     |   6 +
 .../org/apache/tajo/client/QueryClientImpl.java |  52 ++
 .../org/apache/tajo/client/TajoClientImpl.java  |  13 +-
 tajo-client/src/main/proto/ClientProtos.proto   |  60 ++
 .../main/proto/QueryMasterClientProtocol.proto  |   1 +
 .../main/proto/TajoMasterClientProtocol.proto   |   1 +
 tajo-common/pom.xml                             |   4 -
 .../main/java/org/apache/tajo/SessionVars.java  |   7 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  28 +-
 .../org/apache/tajo/storage/EmptyTuple.java     | 176 +++++
 .../apache/tajo/storage/StorageConstants.java   |   9 +
 .../tajo/util/datetime/DateTimeConstants.java   |  22 +
 .../tajo/util/datetime/DateTimeFormat.java      |   2 +-
 .../apache/tajo/util/datetime/DateTimeUtil.java |  96 ++-
 .../apache/tajo/util/TestDateTimeFormat.java    |  12 +-
 .../org/apache/tajo/util/TestDateTimeUtil.java  |  79 ++-
 .../java/org/apache/tajo/benchmark/TPCH.java    |   2 +-
 .../datetime/DateTimePartFromUnixTimestamp.java |  18 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   8 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   2 +-
 .../planner/physical/PhysicalPlanUtil.java      |  41 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   4 +
 .../java/org/apache/tajo/master/TajoMaster.java |  21 +
 .../tajo/master/TajoMasterClientService.java    |  33 +
 .../apache/tajo/master/querymaster/Query.java   |  41 +-
 .../master/querymaster/QueryInProgress.java     |   3 +
 .../tajo/master/querymaster/QueryInfo.java      |  79 ++-
 .../tajo/master/querymaster/QueryMaster.java    |  89 +--
 .../tajo/master/querymaster/QueryUnit.java      |  79 +++
 .../tajo/master/querymaster/SubQuery.java       |  76 ++
 .../main/java/org/apache/tajo/util/JSPUtil.java | 162 ++++-
 .../org/apache/tajo/util/history/History.java   |  27 +
 .../tajo/util/history/HistoryCleaner.java       | 136 ++++
 .../apache/tajo/util/history/HistoryReader.java | 308 ++++++++
 .../apache/tajo/util/history/HistoryWriter.java | 450 ++++++++++++
 .../apache/tajo/util/history/QueryHistory.java  | 151 ++++
 .../tajo/util/history/QueryUnitHistory.java     | 167 +++++
 .../tajo/util/history/SubQueryHistory.java      | 270 +++++++
 .../tajo/webapp/QueryExecutorServlet.java       |  17 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +
 .../tajo/worker/TajoWorkerClientService.java    |  34 +
 .../main/java/org/apache/tajo/worker/Task.java  |   5 +-
 .../org/apache/tajo/worker/TaskHistory.java     |   8 +-
 .../src/main/resources/webapps/admin/query.jsp  |  72 +-
 .../resources/webapps/admin/query_executor.jsp  |  16 +-
 .../resources/webapps/admin/querydetail.jsp     | 116 +++
 .../main/resources/webapps/admin/querytasks.jsp | 249 +++++++
 .../main/resources/webapps/admin/queryunit.jsp  | 134 ++++
 .../resources/webapps/worker/querydetail.jsp    |  43 +-
 .../resources/webapps/worker/querytasks.jsp     | 126 ++--
 .../resources/webapps/worker/taskhistory.jsp    | 123 ++++
 .../test/java/org/apache/tajo/TpchTestBase.java |   2 +-
 .../apache/tajo/cli/tools/TestDDLBuilder.java   |   2 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |  36 +
 .../org/apache/tajo/client/TestTajoClient.java  |  56 +-
 .../engine/function/TestDateTimeFunctions.java  |  15 +-
 .../planner/physical/TestBSTIndexExec.java      |   3 -
 .../apache/tajo/engine/query/TestCTASQuery.java |   2 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  17 +-
 .../tajo/engine/query/TestJoinBroadcast.java    |   4 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |  16 +-
 .../tajo/engine/query/TestNullValues.java       |  14 +-
 .../tajo/engine/query/TestSelectQuery.java      |   4 +-
 .../apache/tajo/engine/query/TestSortQuery.java |  12 +-
 .../tajo/engine/query/TestTablePartitions.java  |   4 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |   2 +-
 .../java/org/apache/tajo/util/TestJSPUtil.java  |  37 +-
 .../util/history/TestHistoryWriterReader.java   | 251 +++++++
 .../results/TestTajoCli/testDescTable.result    |   4 +-
 .../TestTajoCli/testNonForwardQueryPause.result |   5 +
 .../results/TestTajoDump/testDump1.result       |   2 +-
 .../testBuildDDLForBaseTable.result             |   2 +-
 .../testBuildDDLForExternalTable.result         |   2 +-
 .../testBuildDDLQuotedTableName1.result         |   2 +-
 .../testBuildDDLQuotedTableName2.result         |   2 +-
 .../main/sphinx/getting_started/first_query.rst |   2 +-
 .../sphinx/getting_started/prerequisites.rst    |   6 +-
 tajo-docs/src/main/sphinx/sql_language/ddl.rst  |   2 +-
 .../src/main/sphinx/table_management/csv.rst    |  10 +-
 tajo-docs/src/main/sphinx/tsql/variables.rst    |   2 +-
 tajo-project/pom.xml                            |   6 +-
 tajo-rpc/pom.xml                                |   4 -
 tajo-storage/pom.xml                            |   4 +
 .../storage/BinarySerializerDeserializer.java   |   1 +
 .../org/apache/tajo/storage/BufferPool.java     |  74 ++
 .../tajo/storage/ByteBufInputChannel.java       |  76 ++
 .../java/org/apache/tajo/storage/CSVFile.java   |  23 +-
 .../storage/FieldSerializerDeserializer.java    |  35 +
 .../java/org/apache/tajo/storage/RawFile.java   | 185 ++---
 .../tajo/storage/SerializerDeserializer.java    |   2 +-
 .../storage/TextSerializerDeserializer.java     |   1 +
 .../tajo/storage/text/ByteBufLineReader.java    | 154 ++++
 .../tajo/storage/text/DelimitedLineReader.java  | 157 +++++
 .../tajo/storage/text/DelimitedTextFile.java    | 484 +++++++++++++
 .../tajo/storage/text/FieldSplitProcessor.java  |  38 +
 .../tajo/storage/text/LineSplitProcessor.java   |  45 ++
 .../text/TextFieldSerializerDeserializer.java   | 227 ++++++
 .../src/main/resources/storage-default.xml      |  59 +-
 .../tajo/storage/TestCompressionStorages.java   |  76 +-
 .../org/apache/tajo/storage/TestLineReader.java | 163 +++++
 .../apache/tajo/storage/TestSplitProcessor.java |  72 ++
 .../org/apache/tajo/storage/TestStorages.java   |  18 +-
 .../src/test/resources/storage-default.xml      |  18 +-
 147 files changed, 8574 insertions(+), 1624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/CHANGES
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --cc 
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 235d827,b3b4dbb..e30b4f9
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@@ -41,8 -41,10 +41,9 @@@ import org.apache.tajo.rpc.NettyClientB
  import org.apache.tajo.rpc.NullCallback;
  import org.apache.tajo.rpc.RpcConnectionPool;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 -import org.apache.tajo.storage.StorageManager;
  import org.apache.tajo.util.HAServiceUtil;
  import org.apache.tajo.util.NetUtils;
+ import org.apache.tajo.util.history.QueryHistory;
  import org.apache.tajo.worker.TajoWorker;
  
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --cc 
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index c713660,0f275e9..6bc185e
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@@ -42,9 -43,10 +43,11 @@@ import org.apache.tajo.master.event.Que
  import org.apache.tajo.plan.logical.*;
  import org.apache.tajo.storage.DataLocation;
  import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
  import org.apache.tajo.util.Pair;
  import org.apache.tajo.util.TajoIdUtils;
+ import org.apache.tajo.util.history.QueryUnitHistory;
  import org.apache.tajo.worker.FetchImpl;
  
  import java.net.URI;

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --cc 
tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index ec9afaa,96534df..18d4c28
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@@ -63,9 -62,10 +63,11 @@@ import org.apache.tajo.plan.util.Planne
  import org.apache.tajo.plan.logical.*;
  import org.apache.tajo.storage.StorageManager;
  import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
  import org.apache.tajo.unit.StorageUnit;
  import org.apache.tajo.util.KeyValueSet;
+ import org.apache.tajo.util.history.QueryUnitHistory;
+ import org.apache.tajo.util.history.SubQueryHistory;
  import org.apache.tajo.worker.FetchImpl;
  
  import java.io.IOException;
@@@ -281,8 -282,10 +283,9 @@@ public class SubQuery implements EventH
    private TaskSchedulerContext schedulerContext;
    private List<IntermediateEntry> hashShuffleIntermediateEntries = new 
ArrayList<IntermediateEntry>();
    private AtomicInteger completeReportReceived = new AtomicInteger(0);
+   private SubQueryHistory finalSubQueryHistory;
  
 -  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan 
masterPlan,
 -                  ExecutionBlock block, StorageManager sm) {
 +  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan 
masterPlan, ExecutionBlock block) {
      this.context = context;
      this.masterPlan = masterPlan;
      this.block = block;

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index f72a5a1,2fae243..de3912a
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@@ -32,7 -32,7 +32,8 @@@ import org.apache.tajo.common.TajoDataT
  import org.apache.tajo.datum.DatumFactory;
  import org.apache.tajo.datum.NullDatum;
  import org.apache.tajo.datum.ProtobufDatumFactory;
+ import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
  import org.apache.tajo.unit.StorageUnit;
  import org.apache.tajo.util.BitArray;
  
@@@ -57,13 -57,16 +58,16 @@@ public class RawFile 
      private int headerSize = 0; // Header size of a tuple
      private BitArray nullFlags;
      private static final int RECORD_SIZE = 4;
-     private boolean eof = false;
-     private long fileLimit; // If this.fragment represents a complete file, 
this value is equal to the file's size
-     private long numBytesRead;
+     private boolean eos = false;
+     private long startOffset;
+     private long endOffset;
      private FileInputStream fis;
      private long recordCount;
+     private long totalReadBytes;
+     private long filePosition;
+     private boolean forceFillBuffer;
  
 -    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, 
FileFragment fragment) throws IOException {
 +    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, 
Fragment fragment) throws IOException {
        super(conf, schema, meta, fragment);
      }
  
@@@ -81,22 -84,16 +85,16 @@@
  
        fis = new FileInputStream(file);
        channel = fis.getChannel();
-       fileLimit = fragment.getStartKey() + fragment.getLength(); // fileLimit 
is less than or equal to fileSize
+       filePosition = startOffset = fragment.getStartKey();
 -      endOffset = fragment.getStartKey() + fragment.getEndKey();
++      endOffset = fragment.getStartKey() + fragment.getLength();
  
-       if (tableStats != null) {
-         tableStats.setNumBytes(fragment.getLength());
-       }
        if (LOG.isDebugEnabled()) {
-         LOG.debug("RawFileScanner open:" + fragment + "," + 
channel.position() + ", total file size :" + channel.size()
-             + ", fragment size :" + fragment.getLength() + ", fileLimit: " + 
fileLimit);
+         LOG.debug("RawFileScanner open:" + fragment + "," + 
channel.position() + ", file size :" + channel.size()
 -            + ", fragment length :" + fragment.getEndKey());
++            + ", fragment length :" + fragment.getLength());
        }
  
-       if (fragment.getLength() < 64 * StorageUnit.KB) {
-             bufferSize = (int)fragment.getLength();
-       } else {
-             bufferSize = 64 * StorageUnit.KB;
-       }
-       buffer = ByteBuffer.allocateDirect(bufferSize);
+       buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+       buffer = buf.nioBuffer(0, buf.capacity());
  
        columnTypes = new DataType[schema.size()];
        for (int i = 0; i < schema.size(); i++) {
@@@ -124,16 -120,22 +121,22 @@@
  
      @Override
      public void seek(long offset) throws IOException {
-       long currentPos = channel.position();
-       if(currentPos < offset &&  offset < currentPos + buffer.limit()){
-         buffer.position((int)(offset - currentPos));
+       eos = false;
+       filePosition = channel.position();
+ 
+       // do not fill the buffer if the offset is already included in the 
buffer.
+       if(!forceFillBuffer && filePosition > offset && offset > filePosition - 
buffer.limit()){
+         buffer.position((int)(offset - (filePosition - buffer.limit())));
        } else {
-         buffer.clear();
 -        if(offset < startOffset || offset > startOffset + 
fragment.getEndKey()){
++        if(offset < startOffset || offset > startOffset + 
fragment.getLength()){
+           throw new IndexOutOfBoundsException(String.format("range(%d, %d), 
offset: %d",
 -              startOffset, startOffset + fragment.getEndKey(), offset));
++              startOffset, startOffset + fragment.getLength(), offset));
+         }
          channel.position(offset);
-         int bytesRead = channel.read(buffer);
-         numBytesRead = bytesRead;
-         buffer.flip();
-         eof = false;
+         filePosition = offset;
+         buffer.clear();
+         forceFillBuffer = true;
+         fillBuffer();
        }
      }
  
@@@ -421,28 -428,25 +429,25 @@@
      }
  
      @Override
-     public float getProgress() {
-       try {
+     public TableStats getInputStats() {
+       if(tableStats != null){
          tableStats.setNumRows(recordCount);
-         long filePos = 0;
-         if (channel != null) {
-           filePos = channel.position();
-           tableStats.setReadBytes(filePos);
-         }
+         tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + 
rescan * n)
 -        tableStats.setNumBytes(fragment.getEndKey());
++        tableStats.setNumBytes(fragment.getLength());
+       }
+       return tableStats;
+     }
  
-         if(eof || channel == null) {
-           tableStats.setReadBytes(fragment.getLength());
-           return 1.0f;
-         }
+     @Override
+     public float getProgress() {
+       if(eos) {
+         return 1.0f;
+       }
  
-         if (filePos == 0) {
-           return 0.0f;
-         } else {
-           return Math.min(1.0f, ((float)filePos / 
(float)fragment.getLength()));
-         }
-       } catch (IOException e) {
-         LOG.error(e.getMessage(), e);
+       if (filePosition - startOffset == 0) {
          return 0.0f;
+       } else {
+         return Math.min(1.0f, ((float) filePosition / endOffset));
        }
      }
    }

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 0000000,eb1929e..d9e2016
mode 000000,100644..100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@@ -1,0 -1,157 +1,157 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.storage.text;
+ 
+ import io.netty.buffer.ByteBuf;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.io.compress.CompressionCodecFactory;
+ import org.apache.hadoop.io.compress.Decompressor;
+ import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+ import org.apache.tajo.common.exception.NotImplementedException;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.storage.ByteBufInputChannel;
+ import org.apache.tajo.storage.FileScanner;
+ import org.apache.tajo.storage.BufferPool;
+ import org.apache.tajo.storage.compress.CodecPool;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ 
+ import java.io.Closeable;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ public class DelimitedLineReader implements Closeable {
+   private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
+   private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
+ 
+   private FileSystem fs;
+   private FSDataInputStream fis;
+   private InputStream is; //decompressd stream
+   private CompressionCodecFactory factory;
+   private CompressionCodec codec;
+   private Decompressor decompressor;
+ 
+   private long startOffset, end, pos;
+   private boolean eof = true;
+   private ByteBufLineReader lineReader;
+   private AtomicInteger tempReadBytes = new AtomicInteger();
+   private FileFragment fragment;
+   private Configuration conf;
+ 
+   public DelimitedLineReader(Configuration conf, final FileFragment fragment) 
throws IOException {
+     this.fragment = fragment;
+     this.conf = conf;
+     this.factory = new CompressionCodecFactory(conf);
+     this.codec = factory.getCodec(fragment.getPath());
+     if (this.codec instanceof SplittableCompressionCodec) {
+       throw new NotImplementedException(); // bzip2 does not support 
multi-thread model
+     }
+   }
+ 
+   public void init() throws IOException {
+     if (fs == null) {
+       fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
+     }
+     if (fis == null) fis = fs.open(fragment.getPath());
+     pos = startOffset = fragment.getStartKey();
 -    end = startOffset + fragment.getEndKey();
++    end = startOffset + fragment.getLength();
+ 
+     if (codec != null) {
+       decompressor = CodecPool.getDecompressor(codec);
+       is = new DataInputStream(codec.createInputStream(fis, decompressor));
+       ByteBufInputChannel channel = new ByteBufInputChannel(is);
+       lineReader = new ByteBufLineReader(channel, 
BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+     } else {
+       fis.seek(startOffset);
+       is = fis;
+ 
+       ByteBufInputChannel channel = new ByteBufInputChannel(is);
+       lineReader = new ByteBufLineReader(channel,
+           BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+     }
+     eof = false;
+   }
+ 
+   public long getCompressedPosition() throws IOException {
+     long retVal;
+     if (isCompressed()) {
+       retVal = fis.getPos();
+     } else {
+       retVal = pos;
+     }
+     return retVal;
+   }
+ 
+   public long getUnCompressedPosition() throws IOException {
+     return pos;
+   }
+ 
+   public long getReadBytes() {
+     return pos - startOffset;
+   }
+ 
+   public boolean isReadable() {
+     return !eof;
+   }
+ 
+   public ByteBuf readLine() throws IOException {
+     if (eof) {
+       return null;
+     }
+ 
+     ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+     if (buf == null) {
+       eof = true;
+     } else {
+       pos += tempReadBytes.get();
+     }
+ 
+     if (!isCompressed() && getCompressedPosition() > end) {
+       eof = true;
+     }
+     return buf;
+   }
+ 
+   public boolean isCompressed() {
+     return codec != null;
+   }
+ 
+   @Override
+   public void close() throws IOException {
+     try {
+       IOUtils.cleanup(LOG, lineReader, is, fis);
+       fs = null;
+       is = null;
+       fis = null;
+       lineReader = null;
+     } finally {
+       if (decompressor != null) {
+         CodecPool.returnDecompressor(decompressor);
+         decompressor = null;
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 0000000,dbf8435..aad97bc
mode 000000,100644..100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@@ -1,0 -1,483 +1,484 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.storage.text;
+ 
+ import io.netty.buffer.ByteBuf;
+ import org.apache.commons.lang.StringEscapeUtils;
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.io.compress.CompressionCodecFactory;
+ import org.apache.hadoop.io.compress.CompressionOutputStream;
+ import org.apache.hadoop.io.compress.Compressor;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.datum.Datum;
+ import org.apache.tajo.datum.NullDatum;
+ import org.apache.tajo.storage.*;
+ import org.apache.tajo.storage.compress.CodecPool;
+ import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+ import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.storage.fragment.Fragment;
+ import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+ 
+ import java.io.BufferedOutputStream;
+ import java.io.DataOutputStream;
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.util.Arrays;
+ 
+ public class DelimitedTextFile {
+ 
+   public static final byte LF = '\n';
+   public static int EOF = -1;
+ 
+   private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+ 
+   public static class DelimitedTextFileAppender extends FileAppender {
+     private final TableMeta meta;
+     private final Schema schema;
+     private final int columnNum;
+     private final FileSystem fs;
+     private FSDataOutputStream fos;
+     private DataOutputStream outputStream;
+     private CompressionOutputStream deflateFilter;
+     private char delimiter;
+     private TableStatistics stats = null;
+     private Compressor compressor;
+     private CompressionCodecFactory codecFactory;
+     private CompressionCodec codec;
+     private Path compressedPath;
+     private byte[] nullChars;
+     private int BUFFER_SIZE = 128 * 1024;
+     private int bufferedBytes = 0;
+     private long pos = 0;
+ 
+     private NonSyncByteArrayOutputStream os;
+     private FieldSerializerDeserializer serde;
+ 
+     public DelimitedTextFileAppender(Configuration conf, final Schema schema, 
final TableMeta meta, final Path path)
+         throws IOException {
+       super(conf, schema, meta, path);
+       this.fs = path.getFileSystem(conf);
+       this.meta = meta;
+       this.schema = schema;
+       this.delimiter = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
+           StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+       this.columnNum = schema.size();
+ 
+       String nullCharacters = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
+           NullDatum.DEFAULT_TEXT));
+       if (StringUtils.isEmpty(nullCharacters)) {
+         nullChars = NullDatum.get().asTextBytes();
+       } else {
+         nullChars = nullCharacters.getBytes();
+       }
+     }
+ 
+     @Override
+     public void init() throws IOException {
+       if (!fs.exists(path.getParent())) {
+         throw new FileNotFoundException(path.toString());
+       }
+ 
+       if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+         String codecName = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+         codecFactory = new CompressionCodecFactory(conf);
+         codec = codecFactory.getCodecByClassName(codecName);
+         compressor = CodecPool.getCompressor(codec);
+         if (compressor != null) compressor.reset();  //builtin gzip is null
+ 
+         String extension = codec.getDefaultExtension();
+         compressedPath = path.suffix(extension);
+ 
+         if (fs.exists(compressedPath)) {
+           throw new AlreadyExistsStorageException(compressedPath);
+         }
+ 
+         fos = fs.create(compressedPath);
+         deflateFilter = codec.createOutputStream(fos, compressor);
+         outputStream = new DataOutputStream(deflateFilter);
+ 
+       } else {
+         if (fs.exists(path)) {
+           throw new AlreadyExistsStorageException(path);
+         }
+         fos = fs.create(path);
+         outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+       }
+ 
+       if (enabledStats) {
+         this.stats = new TableStatistics(this.schema);
+       }
+ 
+       try {
+         // we need to discuss the De/Serializer interface. so custom serde is 
to disable
+         String serdeClass = 
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+             TextFieldSerializerDeserializer.class.getName());
+         serde = (TextFieldSerializerDeserializer) 
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+       } catch (Throwable e) {
+         LOG.error(e.getMessage(), e);
+         throw new IOException(e);
+       }
+ 
+       if (os == null) {
+         os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+       }
+ 
+       os.reset();
+       pos = fos.getPos();
+       bufferedBytes = 0;
+       super.init();
+     }
+ 
+ 
+     @Override
+     public void addTuple(Tuple tuple) throws IOException {
+       Datum datum;
+       int rowBytes = 0;
+ 
+       for (int i = 0; i < columnNum; i++) {
+         datum = tuple.get(i);
+         rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, 
nullChars);
+ 
+         if (columnNum - 1 > i) {
+           os.write((byte) delimiter);
+           rowBytes += 1;
+         }
+       }
+       os.write(LF);
+       rowBytes += 1;
+ 
+       pos += rowBytes;
+       bufferedBytes += rowBytes;
+       if (bufferedBytes > BUFFER_SIZE) {
+         flushBuffer();
+       }
+       // Statistical section
+       if (enabledStats) {
+         stats.incrementRow();
+       }
+     }
+ 
+     private void flushBuffer() throws IOException {
+       if (os.getLength() > 0) {
+         os.writeTo(outputStream);
+         os.reset();
+         bufferedBytes = 0;
+       }
+     }
+ 
+     @Override
+     public long getOffset() throws IOException {
+       return pos;
+     }
+ 
+     @Override
+     public void flush() throws IOException {
+       flushBuffer();
+       outputStream.flush();
+     }
+ 
+     @Override
+     public void close() throws IOException {
+ 
+       try {
+         if(outputStream != null){
+           flush();
+         }
+ 
+         // Statistical section
+         if (enabledStats) {
+           stats.setNumBytes(getOffset());
+         }
+ 
+         if (deflateFilter != null) {
+           deflateFilter.finish();
+           deflateFilter.resetState();
+           deflateFilter = null;
+         }
+ 
+         os.close();
+       } finally {
+         IOUtils.cleanup(LOG, fos);
+         if (compressor != null) {
+           CodecPool.returnCompressor(compressor);
+           compressor = null;
+         }
+       }
+     }
+ 
+     @Override
+     public TableStats getStats() {
+       if (enabledStats) {
+         return stats.getTableStat();
+       } else {
+         return null;
+       }
+     }
+ 
+     public boolean isCompress() {
+       return compressor != null;
+     }
+ 
+     public String getExtension() {
+       return codec != null ? codec.getDefaultExtension() : "";
+     }
+   }
+ 
+   public static class DelimitedTextFileScanner extends FileScanner {
+ 
+     private boolean splittable = false;
+     private final long startOffset;
+     private final long endOffset;
+ 
+     private int recordCount = 0;
+     private int[] targetColumnIndexes;
+ 
+     private ByteBuf nullChars;
+     private FieldSerializerDeserializer serde;
+     private DelimitedLineReader reader;
+     private FieldSplitProcessor processor;
+ 
+     public DelimitedTextFileScanner(Configuration conf, final Schema schema, 
final TableMeta meta,
 -                                    final FileFragment fragment)
++                                    final Fragment fragment)
+         throws IOException {
+       super(conf, schema, meta, fragment);
 -      reader = new DelimitedLineReader(conf, fragment);
++      reader = new DelimitedLineReader(conf, this.fragment);
+       if (!reader.isCompressed()) {
+         splittable = true;
+       }
+ 
 -      startOffset = fragment.getStartKey();
 -      endOffset = startOffset + fragment.getEndKey();
++      startOffset = this.fragment.getStartKey();
++      endOffset = startOffset + fragment.getLength();
+ 
+       //Delimiter
+       String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+       this.processor = new 
FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+     }
+ 
+     @Override
+     public void init() throws IOException {
+       if (nullChars != null) {
+         nullChars.release();
+       }
+ 
+       String nullCharacters = 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+           NullDatum.DEFAULT_TEXT));
+       byte[] bytes;
+       if (StringUtils.isEmpty(nullCharacters)) {
+         bytes = NullDatum.get().asTextBytes();
+       } else {
+         bytes = nullCharacters.getBytes();
+       }
+ 
+       nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
+       nullChars.writeBytes(bytes);
+ 
+       if (reader != null) {
+         reader.close();
+       }
+       reader = new DelimitedLineReader(conf, fragment);
+       reader.init();
+       recordCount = 0;
+ 
+       if (targets == null) {
+         targets = schema.toArray();
+       }
+ 
+       targetColumnIndexes = new int[targets.length];
+       for (int i = 0; i < targets.length; i++) {
+         targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
+       }
+ 
+       try {
+         // we need to discuss the De/Serializer interface. so custom serde is 
to disable
+         String serdeClass = 
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+             TextFieldSerializerDeserializer.class.getName());
+         serde = (TextFieldSerializerDeserializer) 
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+       } catch (Throwable e) {
+         LOG.error(e.getMessage(), e);
+         throw new IOException(e);
+       }
+ 
+       super.init();
+       Arrays.sort(targetColumnIndexes);
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," 
+ startOffset + "," + endOffset);
+       }
+ 
+       if (startOffset > 0) {
+         reader.readLine();  // skip first line;
+       }
+     }
+ 
+     public ByteBuf readLine() throws IOException {
+       ByteBuf buf = reader.readLine();
+       if (buf == null) {
+         return null;
+       } else {
+         recordCount++;
+       }
+ 
+       return buf;
+     }
+ 
+     @Override
+     public float getProgress() {
+       try {
+         if (!reader.isReadable()) {
+           return 1.0f;
+         }
+         long filePos = reader.getCompressedPosition();
+         if (startOffset == filePos) {
+           return 0.0f;
+         } else {
+           long readBytes = filePos - startOffset;
+           long remainingBytes = Math.max(endOffset - filePos, 0);
+           return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + 
remainingBytes));
+         }
+       } catch (IOException e) {
+         LOG.error(e.getMessage(), e);
+         return 0.0f;
+       }
+     }
+ 
+     @Override
+     public Tuple next() throws IOException {
+       try {
+         if (!reader.isReadable()) return null;
+ 
+         ByteBuf buf = readLine();
+         if (buf == null) return null;
+ 
+         if (targets.length == 0) {
+           return EmptyTuple.get();
+         }
+ 
+         VTuple tuple = new VTuple(schema.size());
+         fillTuple(schema, tuple, buf, targetColumnIndexes);
+         return tuple;
+       } catch (Throwable t) {
+         LOG.error("Tuple list current index: " + recordCount + " file 
offset:" + reader.getCompressedPosition(), t);
+         throw new IOException(t);
+       }
+     }
+ 
+     private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] 
target) throws IOException {
+       int[] projection = target;
+       if (lineBuf == null || target == null || target.length == 0) {
+         return;
+       }
+ 
+       final int rowLength = lineBuf.readableBytes();
+       int start = 0, fieldLength = 0, end = 0;
+ 
+       //Projection
+       int currentTarget = 0;
+       int currentIndex = 0;
+ 
+       while (end != -1) {
+         end = lineBuf.forEachByte(start, rowLength - start, processor);
+ 
+         if (end < 0) {
+           fieldLength = rowLength - start;
+         } else {
+           fieldLength = end - start;
+         }
+ 
+         if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
+           Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength),
+               schema.getColumn(currentIndex), currentIndex, nullChars);
+           dst.put(currentIndex, datum);
+           currentTarget++;
+         }
+ 
+         if (projection.length == currentTarget) {
+           break;
+         }
+ 
+         start = end + 1;
+         currentIndex++;
+       }
+     }
+ 
+     @Override
+     public void reset() throws IOException {
+       init();
+     }
+ 
+     @Override
+     public void close() throws IOException {
+       try {
+         if (nullChars != null) {
+           nullChars.release();
+           nullChars = null;
+         }
+ 
+         if (tableStats != null && reader != null) {
+           tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
+           tableStats.setNumRows(recordCount);
+         }
+         if (LOG.isDebugEnabled()) {
+           LOG.debug("DelimitedTextFileScanner processed record:" + 
recordCount);
+         }
+       } finally {
+         IOUtils.cleanup(LOG, reader);
+         reader = null;
+       }
+     }
+ 
+     @Override
+     public boolean isProjectable() {
+       return true;
+     }
+ 
+     @Override
+     public boolean isSelectable() {
+       return false;
+     }
+ 
+     @Override
+     public void setSearchCondition(Object expr) {
+     }
+ 
+     @Override
+     public boolean isSplittable() {
+       return splittable;
+     }
+ 
+     @Override
+     public TableStats getInputStats() {
+       if (tableStats != null && reader != null) {
+         tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
+         tableStats.setNumRows(recordCount);
 -        tableStats.setNumBytes(fragment.getEndKey());
++        tableStats.setNumBytes(fragment.getLength());
+       }
+       return tableStats;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 0000000,ef6efdf..1a4bdba
mode 000000,100644..100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@@ -1,0 -1,163 +1,163 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.storage;
+ 
+ import io.netty.buffer.ByteBuf;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.DeflateCodec;
+ import org.apache.tajo.catalog.CatalogUtil;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.common.TajoDataTypes.Type;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.datum.DatumFactory;
+ import org.apache.tajo.datum.NullDatum;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.text.ByteBufLineReader;
+ import org.apache.tajo.storage.text.DelimitedTextFile;
+ import org.apache.tajo.storage.text.DelimitedLineReader;
+ import org.apache.tajo.util.CommonTestingUtil;
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import static org.junit.Assert.*;
+ 
+ public class TestLineReader {
+       private static String TEST_PATH = "target/test-data/TestLineReader";
+ 
+   @Test
+   public void testByteBufLineReader() throws IOException {
+     TajoConf conf = new TajoConf();
+     Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+     FileSystem fs = testDir.getFileSystem(conf);
+ 
+     Schema schema = new Schema();
+     schema.addColumn("id", Type.INT4);
+     schema.addColumn("age", Type.INT8);
+     schema.addColumn("comment", Type.TEXT);
+     schema.addColumn("comment2", Type.TEXT);
+ 
+     TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+     Path tablePath = new Path(testDir, "line.data");
 -    FileAppender appender = (FileAppender) 
StorageManager.getStorageManager(conf).getAppender(meta, schema,
++    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
+         tablePath);
+     appender.enableStats();
+     appender.init();
+     int tupleNum = 10000;
+     VTuple vTuple;
+ 
+     for (int i = 0; i < tupleNum; i++) {
+       vTuple = new VTuple(4);
+       vTuple.put(0, DatumFactory.createInt4(i + 1));
+       vTuple.put(1, DatumFactory.createInt8(25l));
+       vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+       vTuple.put(3, NullDatum.get());
+       appender.addTuple(vTuple);
+     }
+     appender.close();
+ 
+     FileStatus status = fs.getFileStatus(tablePath);
+ 
+     ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
+     assertEquals(status.getLen(), channel.available());
+     ByteBufLineReader reader = new ByteBufLineReader(channel);
+     assertEquals(status.getLen(), reader.available());
+ 
+     long totalRead = 0;
+     int i = 0;
+     AtomicInteger bytes = new AtomicInteger();
+     for(;;){
+       ByteBuf buf = reader.readLineBuf(bytes);
+       if(buf == null) break;
+ 
+       totalRead += bytes.get();
+       i++;
+     }
+     IOUtils.cleanup(null, reader, channel, fs);
+     assertEquals(tupleNum, i);
+     assertEquals(status.getLen(), totalRead);
+     assertEquals(status.getLen(), reader.readBytes());
+   }
+ 
+   @Test
+   public void testLineDelimitedReader() throws IOException {
+     TajoConf conf = new TajoConf();
+     Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+     FileSystem fs = testDir.getFileSystem(conf);
+ 
+     Schema schema = new Schema();
+     schema.addColumn("id", Type.INT4);
+     schema.addColumn("age", Type.INT8);
+     schema.addColumn("comment", Type.TEXT);
+     schema.addColumn("comment2", Type.TEXT);
+ 
+     TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+     meta.putOption("compression.codec", 
DeflateCodec.class.getCanonicalName());
+ 
+     Path tablePath = new Path(testDir, "line1." + 
DeflateCodec.class.getSimpleName());
 -    FileAppender appender = (FileAppender) 
StorageManager.getStorageManager(conf).getAppender(meta, schema,
++    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
+         tablePath);
+     appender.enableStats();
+     appender.init();
+     int tupleNum = 10000;
+     VTuple vTuple;
+ 
+     long splitOffset = 0;
+     for (int i = 0; i < tupleNum; i++) {
+       vTuple = new VTuple(4);
+       vTuple.put(0, DatumFactory.createInt4(i + 1));
+       vTuple.put(1, DatumFactory.createInt8(25l));
+       vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+       vTuple.put(3, NullDatum.get());
+       appender.addTuple(vTuple);
+ 
+       if(i == (tupleNum / 2)){
+         splitOffset = appender.getOffset();
+       }
+     }
+     String extension = ((DelimitedTextFile.DelimitedTextFileAppender) 
appender).getExtension();
+     appender.close();
+ 
+     tablePath = tablePath.suffix(extension);
+     FileFragment fragment = new FileFragment("table", tablePath, 0, 
splitOffset);
+     DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // 
if file is compressed, will read to EOF
+     assertTrue(reader.isCompressed());
+     assertFalse(reader.isReadable());
+     reader.init();
+     assertTrue(reader.isReadable());
+ 
+ 
+     int i = 0;
+     while(reader.isReadable()){
+       ByteBuf buf = reader.readLine();
+       if(buf == null) break;
+       i++;
+     }
+ 
+     IOUtils.cleanup(null, reader, fs);
+     assertEquals(tupleNum, i);
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 8c6c7ad,a3f80cf..b1e3a28
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@@ -789,8 -791,8 +791,8 @@@ public class TestStorages 
  
      TableMeta meta = CatalogUtil.newTableMeta(storeType);
      Path tablePath = new Path(testDir, "Seekable.data");
 -    FileAppender appender = (FileAppender) 
StorageManager.getStorageManager(conf).getAppender(meta, schema,
 +    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
-             tablePath);
+         tablePath);
      appender.enableStats();
      appender.init();
      int tupleNum = 100000;

Reply via email to