Repository: tajo
Updated Branches:
  refs/heads/master e5b30e542 -> 4820610f4


http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index d177c48..c09e85e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -35,7 +35,6 @@ import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -67,6 +66,8 @@ public class WindowAggExec extends UnaryPhysicalExec {
   private boolean [] windowFuncFlags;
   private boolean [] endUnboundedFollowingFlags;
   private boolean [] endCurrentRowFlags;
+  private Tuple currentKey;
+  private Tuple evaluatedTuple;
 
   // operator state
   enum WindowState {
@@ -79,10 +80,9 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
   // Transient state
   boolean firstTime = true;
-  List<Tuple> evaluatedTuples = null;
-  List<Tuple> accumulatedInTuples = null;
-  List<Tuple> nextAccumulatedProjected = null;
-  List<Tuple> nextAccumulatedInTuples = null;
+  TupleList evaluatedTuples = null;
+  TupleList accumulatedInTuples = null;
+  TupleList nextAccumulatedInTuples = null;
   WindowState state = WindowState.NEW_WINDOW;
   Iterator<Tuple> tupleInFrameIterator = null;
 
@@ -105,6 +105,8 @@ public class WindowAggExec extends UnaryPhysicalExec {
       hasPartitionKeys = false;
     }
 
+    currentKey = new VTuple(partitionKeyNum);
+
     if (plan.hasAggFunctions()) {
       functions = plan.getWindowFunctions();
       functionNum = functions.length;
@@ -169,7 +171,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
       schemaForOrderBy = outSchema;
     }
 
-
+    evaluatedTuple = new VTuple(schemaForOrderBy.size());
     nonFunctionColumnNum = plan.getTargets().length - functionNum;
     nonFunctionColumns = new int[nonFunctionColumnNum];
     for (int idx = 0; idx < plan.getTargets().length - functionNum; idx++) {
@@ -193,7 +195,6 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
   @Override
   public Tuple next() throws IOException {
-    Tuple currentKey = null;
     Tuple readTuple = null;
 
     while(!context.isStopped() && state != WindowState.END_OF_TUPLE) {
@@ -212,7 +213,6 @@ public class WindowAggExec extends UnaryPhysicalExec {
         }
 
         if (readTuple != null && hasPartitionKeys) { // get a key tuple
-          currentKey = new VTuple(partitionKeyIds.length);
           for (int i = 0; i < partitionKeyIds.length; i++) {
             currentKey.put(i, readTuple.asDatum(partitionKeyIds[i]));
           }
@@ -244,7 +244,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
   private void initWindow() {
     if (firstTime) {
-      accumulatedInTuples = Lists.newArrayList();
+      accumulatedInTuples = new TupleList();
 
       contexts = new FunctionContext[functionNum];
       for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) {
@@ -257,8 +257,10 @@ public class WindowAggExec extends UnaryPhysicalExec {
   private void accumulatingWindow(Tuple currentKey, Tuple inTuple) {
 
     if (lastKey == null || lastKey.equals(currentKey)) { // if the current key 
is same to the previous key
-      accumulatedInTuples.add(new VTuple(inTuple));
-
+      accumulatedInTuples.add(inTuple);
+      if (lastKey == null) {
+        lastKey = new VTuple(currentKey.size());
+      }
     } else {
       // if the current key is different from the previous key,
       // the current key belongs to the next window frame. 
preaccumulatingNextWindow() will
@@ -267,37 +269,30 @@ public class WindowAggExec extends UnaryPhysicalExec {
       transition(WindowState.EVALUATION);
     }
 
-    lastKey = currentKey;
+    lastKey.put(currentKey.getValues());
   }
 
   private void preAccumulatingNextWindow(Tuple inTuple) {
-    Tuple projectedTuple = new VTuple(outSchema.size());
-    for(int idx = 0; idx < nonFunctionColumnNum; idx++) {
-      projectedTuple.put(idx, inTuple.asDatum(nonFunctionColumns[idx]));
-    }
-    nextAccumulatedProjected = Lists.newArrayList();
-    nextAccumulatedProjected.add(projectedTuple);
-    nextAccumulatedInTuples = Lists.newArrayList();
-    nextAccumulatedInTuples.add(new VTuple(inTuple));
+    nextAccumulatedInTuples = new TupleList();
+    nextAccumulatedInTuples.add(inTuple);
   }
 
   private void evaluationWindowFrame() {
     TupleComparator comp;
 
-    evaluatedTuples = new ArrayList<Tuple>();
+    evaluatedTuples = new TupleList();
 
     for (int i = 0; i <accumulatedInTuples.size(); i++) {
       Tuple inTuple = accumulatedInTuples.get(i);
 
-      Tuple projectedTuple = new VTuple(schemaForOrderBy.size());
       for (int c = 0; c < nonFunctionColumnNum; c++) {
-        projectedTuple.put(c, inTuple.asDatum(nonFunctionColumns[c]));
+        evaluatedTuple.put(c, inTuple.asDatum(nonFunctionColumns[c]));
       }
       for (int c = 0; c < sortKeyColumns.length; c++) {
-        projectedTuple.put(outputColumnNum + c, 
inTuple.asDatum(sortKeyColumns[c]));
+        evaluatedTuple.put(outputColumnNum + c, 
inTuple.asDatum(sortKeyColumns[c]));
       }
 
-      evaluatedTuples.add(projectedTuple);
+      evaluatedTuples.add(evaluatedTuple);
     }
 
     for (int idx = 0; idx < functions.length; idx++) {
@@ -354,4 +349,22 @@ public class WindowAggExec extends UnaryPhysicalExec {
     lastKey = null;
     noMoreTuples = false;
   }
+
+  @Override
+  public void close() throws IOException {
+    if (evaluatedTuples != null) {
+      evaluatedTuples.clear();
+      evaluatedTuples = null;
+    }
+    if (accumulatedInTuples != null) {
+      accumulatedInTuples.clear();
+      accumulatedInTuples = null;
+    }
+    if (nextAccumulatedInTuples != null) {
+      nextAccumulatedInTuples.clear();
+      nextAccumulatedInTuples = null;
+    }
+
+    super.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index da2f2ad..7279e8e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -29,6 +29,8 @@ import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.session.Session;
 
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 
 import static 
org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 
@@ -42,7 +44,10 @@ public class QueryContext extends OverridableConf {
 
   public QueryContext(TajoConf conf, Session session) {
     super(conf, ConfigKey.ConfigType.QUERY, ConfigKey.ConfigType.SESSION);
-    putAll(session.getAllVariables());
+    Map<String, String> copy = new HashMap<String, 
String>(session.getAllVariables());
+    // Among session variables, timezone must not be
+    copy.remove("TIMEZONE");
+    putAll(copy);
   }
 
   public QueryContext(TajoConf conf, KeyValueSetProto proto) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
index addca49..05936be 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -20,15 +20,14 @@ package org.apache.tajo.engine.utils;
 
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.engine.planner.physical.TupleList;
+import org.apache.tajo.engine.planner.physical.TupleMap;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Deallocatable;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 public interface CacheHolder<T> {
 
@@ -52,19 +51,19 @@ public interface CacheHolder<T> {
    * This is a cache-holder for a join table
    * It will release when execution block is finished
    */
-  public static class BroadcastCacheHolder implements CacheHolder<Map<Tuple, 
List<Tuple>>> {
-    private Map<Tuple, List<Tuple>> data;
+  class BroadcastCacheHolder implements CacheHolder<TupleMap<TupleList>> {
+    private TupleMap<TupleList> data;
     private Deallocatable rowBlock;
     private TableStats tableStats;
 
-    public BroadcastCacheHolder(Map<Tuple, List<Tuple>> data, TableStats 
tableStats, Deallocatable rowBlock){
+    public BroadcastCacheHolder(TupleMap<TupleList> data, TableStats 
tableStats, Deallocatable rowBlock){
       this.data = data;
       this.tableStats = tableStats;
       this.rowBlock = rowBlock;
     }
 
     @Override
-    public Map<Tuple, List<Tuple>> getData() {
+    public TupleMap<TupleList> getData() {
       return data;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 1dcbcab..0a04213 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.utils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
@@ -30,17 +29,13 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -177,20 +172,4 @@ public class TupleUtil {
     }
     return new TupleRange(sortSpecs, startTuple, endTuple);
   }
-
-  /**
-   * It creates a tuple of a given size filled with NULL values in all fields
-   * It is usually used in outer join algorithms.
-   *
-   * @param size The number of columns of a creating tuple
-   * @return The created tuple filled with NULL values
-   */
-  public static Tuple createNullPaddedTuple(int size){
-    VTuple aTuple = new VTuple(size);
-    int i;
-    for(i = 0; i < size; i++){
-      aTuple.put(i, DatumFactory.createNullDatum());
-    }
-    return aTuple;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 52c8d79..228c0b8 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -694,8 +694,7 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
     @Override
     public Tuple next() throws IOException {
       Tuple aTuple;
-      Tuple outTuple = new VTuple(outColumnNum);
-      
+
       if (isClosed) {
         return null;
       }
@@ -707,7 +706,7 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
       if (!scanNode.hasQual()) {
         if (currentRow < cachedData.size()) {
           aTuple = cachedData.get(currentRow++);
-          projector.eval(aTuple, outTuple);
+          Tuple outTuple = projector.eval(aTuple);
           outTuple.setOffset(aTuple.getOffset());
           return outTuple;
         }
@@ -716,7 +715,8 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
         while (currentRow < cachedData.size()) {
           aTuple = cachedData.get(currentRow++);
           if (qual.eval(aTuple).isTrue()) {
-            projector.eval(aTuple, outTuple);
+            Tuple outTuple = projector.eval(aTuple);
+            outTuple.setOffset(aTuple.getOffset());
             return outTuple;
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 60349dd..25b306f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -154,6 +154,7 @@ public class TajoTestingCluster {
     // Memory cache termination
     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
 
+    // Python function path
     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, 
getClass().getResource("/python").toString());
 
     /* Since Travis CI limits the size of standard output log up to 4MB */

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index c01adb5..d383662 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -40,6 +40,7 @@ import org.apache.tajo.engine.function.builtin.SumInt;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.*;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
@@ -1334,5 +1335,4 @@ public class TestLogicalPlanner {
     assertEquals(alterTableNode.getPartitionValues()[1], "01");
     assertEquals(alterTableNode.getPartitionValues()[2], "11");
   }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
index ea1214e..4061b25 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
@@ -33,13 +33,7 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 
 import static org.junit.Assert.assertArrayEquals;
 
@@ -80,7 +74,8 @@ public class TestTupleSorter {
     long[] time1 = new long[ITERATION];
     long[] time2 = new long[ITERATION];
     for(int iteration = 0; iteration < ITERATION; iteration++) {
-      List<Tuple> target = Arrays.asList(Arrays.copyOf(tuples, tuples.length));
+      TupleList target = new TupleList(tuples.length);
+      target.addAll(Arrays.asList(Arrays.copyOf(tuples, tuples.length)));
       Set<Integer> keys = new TreeSet<Integer>();
       for (int i = 0; i < MAX_SORT_KEY; i++) {
         keys.add(rnd.nextInt(schema.size()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 3055362..a5caf38 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -26,8 +26,8 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.querymaster.Query;
 import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -565,12 +565,12 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  @Option(sort = true)
+  @SimpleTest
   public final void testHavingWithAggFunction() throws Exception {
     // select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from 
lineitem group by l_orderkey
     // having avg(l_partkey) = 2.5 or num = 1;
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
+    runSimpleTests();
   }
 
   @Test
@@ -693,6 +693,8 @@ public class TestGroupByQuery extends QueryTestCaseBase {
 
   @Test
   public final void testNumShufflePartition() throws Exception {
+
+    Thread.sleep(5000);
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.TEXT_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
@@ -801,17 +803,17 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  @Option(sort = true)
+  @SimpleTest
   public final void testGroupbyWithPythonFunc() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
+    runSimpleTests();
   }
 
   @Test
+  @Option(sort = true)
+  @SimpleTest
   public final void testGroupbyWithPythonFunc2() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
+    runSimpleTests();
   }
 
   @Test
@@ -829,10 +831,10 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  @Option(sort = true)
+  @SimpleTest
   public final void testPythonUdaf3() throws Exception {
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
+    runSimpleTests();
   }
 
   // TODO: this test cannot be executed due to the bug of logical planner 
(TAJO-1588)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json
 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json
index 7a60c64..072cf9b 100644
--- 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json
+++ 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json
@@ -42,57 +42,86 @@
         }
     ],
     "Expr": {
-        "HavingCondition": {
-            "LeftExpr": {
-                "LeftExpr": {
-                    "ColumnName": "total",
+        "SortSpecs": [
+            {
+                "SortKey": {
+                    "ColumnName": "l_orderkey",
                     "OpType": "Column"
                 },
-                "RightExpr": {
-                    "Value": "2",
-                    "ValueType": "Unsigned_Integer",
-                    "OpType": "Literal"
+                "IsAsc": true,
+                "IsNullFirst": false
+            },
+            {
+                "SortKey": {
+                    "ColumnName": "total",
+                    "OpType": "Column"
                 },
-                "OpType": "GreaterThanOrEquals"
+                "IsAsc": true,
+                "IsNullFirst": false
             },
-            "RightExpr": {
-                "LeftExpr": {
+            {
+                "SortKey": {
                     "ColumnName": "num",
                     "OpType": "Column"
                 },
+                "IsAsc": true,
+                "IsNullFirst": false
+            }
+        ],
+        "Expr": {
+            "HavingCondition": {
+                "LeftExpr": {
+                    "LeftExpr": {
+                        "ColumnName": "total",
+                        "OpType": "Column"
+                    },
+                    "RightExpr": {
+                        "Value": "2",
+                        "ValueType": "Unsigned_Integer",
+                        "OpType": "Literal"
+                    },
+                    "OpType": "GreaterThanOrEquals"
+                },
                 "RightExpr": {
-                    "Value": "3",
-                    "ValueType": "Unsigned_Integer",
-                    "OpType": "Literal"
+                    "LeftExpr": {
+                        "ColumnName": "num",
+                        "OpType": "Column"
+                    },
+                    "RightExpr": {
+                        "Value": "3",
+                        "ValueType": "Unsigned_Integer",
+                        "OpType": "Literal"
+                    },
+                    "OpType": "Equals"
                 },
-                "OpType": "Equals"
+                "OpType": "Or"
             },
-            "OpType": "Or"
-        },
-        "Expr": {
-            "Groups": [
-                {
-                    "GroupType": "OrdinaryGroup",
-                    "Dimensions": [
-                        {
-                            "ColumnName": "l_orderkey",
-                            "OpType": "Column"
-                        }
-                    ]
-                }
-            ],
             "Expr": {
-                "Relations": [
+                "Groups": [
                     {
-                        "TableName": "lineitem",
-                        "OpType": "Relation"
+                        "GroupType": "OrdinaryGroup",
+                        "Dimensions": [
+                            {
+                                "ColumnName": "l_orderkey",
+                                "OpType": "Column"
+                            }
+                        ]
                     }
                 ],
-                "OpType": "RelationList"
+                "Expr": {
+                    "Relations": [
+                        {
+                            "TableName": "lineitem",
+                            "OpType": "Relation"
+                        }
+                    ],
+                    "OpType": "RelationList"
+                },
+                "OpType": "Aggregation"
             },
-            "OpType": "Aggregation"
+            "OpType": "Having"
         },
-        "OpType": "Having"
+        "OpType": "Sort"
     },
     "OpType": "Projection"
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql
 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql
index 1bb7c7f..fd1e5fb 100644
--- 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql
+++ 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql
@@ -1 +1 @@
-select l_orderkey, sum(l_linenumber) from lineitem group by l_orderkey limit 1;
\ No newline at end of file
+select l_orderkey, sum(l_linenumber) from lineitem group by l_orderkey order 
by l_orderkey limit 1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql
 
b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql
index 057ba05..c19e69e 100644
--- 
a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql
+++ 
b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql
@@ -4,4 +4,6 @@ SELECT
 FROM
   tweets
 GROUP BY
+  user.name
+order by
   user.name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result
index 366b76e..81f1bfd 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result
@@ -1,5 +1,5 @@
 l_orderkey,total,num
 -------------------------------
-3,2.5,3
+1,1.0,3
 2,2.0,1
-1,1.0,3
\ No newline at end of file
+3,2.5,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result
index cb1e141..e882158 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result
@@ -1,3 +1,3 @@
 l_orderkey,?sum
 -------------------------------
-3,3
\ No newline at end of file
+1,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
index 2a5fb8a..1e8821c 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
@@ -1,7 +1,7 @@
 ?count
 -------------------------------
 4
+4
 5
 5
 5
-4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
index 08561fd..1413e41 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
@@ -1,7 +1,7 @@
 n_regionkey,cnt
 -------------------------------
 0,5
+1,5
+2,5
 3,5
 4,5
-2,5
-1,5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result
index b8369d2..fcd604e 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result
@@ -1,4 +1,4 @@
 l_orderkey,total,num
 -------------------------------
+2,2.0,1
 3,2.5,3
-2,2.0,1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result
index 0607720..7338988 100644
--- 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result
@@ -1,5 +1,5 @@
 ?avgpy,?countpy_1,?avg_2,?count_3
 -------------------------------
+173665.47,1,173665.47,1
 193846.25,1,193846.25,1
 46929.18,1,46929.18,1
-173665.47,1,173665.47,1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result
 
b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result
index debf06e..d881f87 100644
--- 
a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result
+++ 
b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result
@@ -1,6 +1,6 @@
 user/name,total_retweet
 -------------------------------
 Chaz Martenstein,2
-Thomas John Wakeman,3
+Marty Elmer,4
 Sean Cummings,1
-Marty Elmer,4
\ No newline at end of file
+Thomas John Wakeman,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result
 
b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result
index c34577c..fffa2dd 100644
--- 
a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result
+++ 
b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result
@@ -1,5 +1,5 @@
 l_orderkey,cnt,row_num
 -------------------------------
-3,2,1
-1,2,2
+1,2,1
+3,2,2
 2,1,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result
 
b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result
index 7dd39ac..c2e02d5 100644
--- 
a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result
+++ 
b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result
@@ -1,5 +1,5 @@
 l_orderkey,cnt,row_num
 -------------------------------
-1,2,2
+1,2,1
 2,1,3
-3,2,1
\ No newline at end of file
+3,2,2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java 
b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index 9ae1837..fad285c 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -93,6 +93,11 @@ public class MetaDataTuple implements Tuple {
   }
 
   @Override
+  public void clearOffset() {
+    throw new UnsupportedException("clearOffset");
+  }
+
+  @Override
   public Datum asDatum(int fieldId) {
     return values.get(fieldId);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 5011d87..4a208c2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1299,7 +1299,7 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
     QueryBlock block = context.queryBlock;
 
     ScanNode scanNode = block.getNodeFromExpr(expr);
-    updatePhysicalInfo(scanNode.getTableDesc());
+    updatePhysicalInfo(context, scanNode.getTableDesc());
 
     // Find expression which can be evaluated at this relation node.
     // Except for column references, additional expressions used in select 
list, where clause, order-by clauses
@@ -1358,12 +1358,12 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return targets;
   }
 
-  private void updatePhysicalInfo(TableDesc desc) {
+  private void updatePhysicalInfo(PlanContext planContext, TableDesc desc) {
     if (desc.getUri() != null &&
         desc.getMeta().getStoreType() != "SYSTEM" && 
PlannerUtil.isFileStorageType(desc.getMeta().getStoreType())) {
       try {
         Path path = new Path(desc.getUri());
-        FileSystem fs = path.getFileSystem(new Configuration());
+        FileSystem fs = path.getFileSystem(planContext.queryContext.getConf());
         FileStatus status = fs.getFileStatus(path);
         if (desc.getStats() != null && (status.isDirectory() || 
status.isFile())) {
           ContentSummary summary = fs.getContentSummary(path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
index 5576889..2fe0cbe 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
@@ -44,8 +44,6 @@ public class TablePropertyUtil {
     if (storeType.equalsIgnoreCase("CSV") || 
storeType.equalsIgnoreCase("TEXT")) {
       setSessionToProperty(context, SessionVars.NULL_CHAR, property, 
StorageConstants.TEXT_NULL);
     }
-
-    setSessionToProperty(context, SessionVars.TIMEZONE, property, 
StorageConstants.TIMEZONE);
   }
 
   private static void setSessionToProperty(OverridableConf context,

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
index ed53832..de39d08 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -125,6 +125,11 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public void clearOffset() {
+    throw new UnsupportedException();
+  }
+
+  @Override
   public void setOffset(long offset) {
     throw new UnsupportedException();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 7bfc166..d4597fa 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -29,6 +29,7 @@ import org.apache.tajo.util.datetime.TimeMeta;
 
 import java.util.Arrays;
 
+@Deprecated
 public class LazyTuple implements Tuple, Cloneable {
   private long offset;
   private Datum[] values;
@@ -122,6 +123,11 @@ public class LazyTuple implements Tuple, Cloneable {
     return get(fieldId).size();
   }
 
+  @Override
+  public void clearOffset() {
+    this.offset = -1;
+  }
+
   //////////////////////////////////////////////////////
   // Getter
   //////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index ba9f873..51cedb2 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -45,7 +45,7 @@ public class RowStoreUtil {
   }
 
   public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
-    out.clear();
+    out.clearOffset();
     for (int idx = 0; idx < targetIds.length; idx++) {
       out.put(idx, in.asDatum(targetIds[idx]));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
index 1f43ef8..8f36b35 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -65,6 +65,10 @@ public class HeapTuple implements Tuple {
     return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
   }
 
+  @Override
+  public void clearOffset() {
+  }
+
   public ByteBuffer nioBuffer() {
     return ByteBuffer.wrap(data);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index e7bd2aa..4ccba7b 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -186,6 +186,10 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   @Override
+  public void clearOffset() {
+  }
+
+  @Override
   public void setOffset(long offset) {
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 11851ec..1626526 100644
--- 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -85,6 +85,8 @@ public class HBaseScanner implements Scanner {
   private int[] rowKeyFieldIndexes;
   private char rowKeyDelimiter;
 
+  private Tuple outTuple;
+
   public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, 
Fragment fragment) throws IOException {
     Preconditions.checkNotNull(conf);
     Preconditions.checkNotNull(schema);
@@ -119,6 +121,8 @@ public class HBaseScanner implements Scanner {
       targets = schema.toArray();
     }
 
+    outTuple = new VTuple(targets.length);
+
     columnMapping = new ColumnMapping(schema, meta.getOptions());
     targetIndexes = new int[targets.length];
     int index = 0;
@@ -206,12 +210,11 @@ public class HBaseScanner implements Scanner {
     }
 
     Result result = scanResults[scanResultIndex++];
-    Tuple resultTuple = new VTuple(targetIndexes.length);
     for (int i = 0; i < targetIndexes.length; i++) {
-      resultTuple.put(i, getDatum(result, targetIndexes[i]));
+      outTuple.put(i, getDatum(result, targetIndexes[i]));
     }
     numRows++;
-    return resultTuple;
+    return outTuple;
   }
 
   private Datum getDatum(Result result, int fieldId) throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 8b8ca76..e55e34b 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -18,12 +18,14 @@
 
 package org.apache.tajo.storage;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
@@ -40,8 +42,10 @@ import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.storage.text.TextLineDeserializer;
 import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.BytesUtils;
 
 import java.io.*;
 import java.util.ArrayList;
@@ -50,7 +54,6 @@ import java.util.Arrays;
 public class CSVFile {
 
   public static final byte LF = '\n';
-  public static int EOF = -1;
 
   private static final Log LOG = LogFactory.getLog(CSVFile.class);
 
@@ -262,26 +265,9 @@ public class CSVFile {
       if (codec == null || codec instanceof SplittableCompressionCodec) {
         splittable = true;
       }
-
-      //Delimiter
-      this.delimiter = StringEscapeUtils.unescapeJava(
-          meta.getOption(StorageConstants.TEXT_DELIMITER,
-          meta.getOption(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER)))
-          .getBytes(Bytes.UTF8_CHARSET);
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(
-          meta.getOption(StorageConstants.TEXT_NULL,
-          meta.getOption(StorageConstants.CSVFILE_NULL, 
NullDatum.DEFAULT_TEXT)));
-
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
-      }
     }
 
     private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
-    private byte[] delimiter;
     private FileSystem fs;
     private FSDataInputStream fis;
     private InputStream is; //decompressd stream
@@ -294,13 +280,14 @@ public class CSVFile {
     private int currentIdx = 0, validIdx = 0, recordCount = 0;
     private int[] targetColumnIndexes;
     private boolean eof = false;
-    private final byte[] nullChars;
     private SplitLineReader reader;
     private ArrayList<Long> fileOffsets;
     private ArrayList<Integer> rowLengthList;
     private ArrayList<Integer> startOffsets;
     private NonSyncByteArrayOutputStream buffer;
-    private SerializerDeserializer serde;
+    private Tuple outTuple;
+    private TextLineDeserializer deserializer;
+    private ByteBuf byteBuf = 
BufferPool.directBuffer(ByteBufLineReader.DEFAULT_BUFFER);
 
     @Override
     public void init() throws IOException {
@@ -347,22 +334,15 @@ public class CSVFile {
         targets = schema.toArray();
       }
 
+      outTuple = new VTuple(targets.length);
+      deserializer = 
DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets);
+      deserializer.init();
+
       targetColumnIndexes = new int[targets.length];
       for (int i = 0; i < targets.length; i++) {
         targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
       }
 
-      try {
-        //FIXME
-        String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
-            TextSerializerDeserializer.class.getName());
-        serde = (SerializerDeserializer) 
Class.forName(serdeClass).newInstance();
-        serde.init(schema);
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
       super.init();
       Arrays.sort(targetColumnIndexes);
       if (LOG.isDebugEnabled()) {
@@ -396,7 +376,7 @@ public class CSVFile {
     }
 
     private void page() throws IOException {
-//      // Index initialization
+      // Index initialization
       currentIdx = 0;
       validIdx = 0;
       int currentBufferPos = 0;
@@ -473,15 +453,13 @@ public class CSVFile {
           }
         }
 
-        long offset = -1;
-        if(!isCompress()){
-          offset = fileOffsets.get(currentIdx);
-        }
+        byteBuf.clear();
+        byteBuf.writeBytes(buffer.getData(), startOffsets.get(currentIdx), 
rowLengthList.get(currentIdx));
+
+        deserializer.deserialize(byteBuf, outTuple);
 
-        byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), 
startOffsets.get(currentIdx),
-            rowLengthList.get(currentIdx), delimiter, targetColumnIndexes, 
schema.size());
         currentIdx++;
-        return new LazyTuple(schema, cells, offset, nullChars, serde);
+        return outTuple;
       } catch (Throwable t) {
         LOG.error("Tuple list length: " + (fileOffsets != null ? 
fileOffsets.size() : 0), t);
         LOG.error("Tuple list current index: " + currentIdx, t);
@@ -523,12 +501,16 @@ public class CSVFile {
           CodecPool.returnDecompressor(decompressor);
           decompressor = null;
         }
+        outTuple = null;
+        if (this.byteBuf.refCnt() > 0) {
+          this.byteBuf.release();
+        }
       }
     }
 
     @Override
     public boolean isProjectable() {
-      return false;
+      return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 3b655be..e3594f9 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -58,7 +58,7 @@ public class RawFile {
 
     private ByteBuffer buffer;
     private ByteBuf buf;
-    private Tuple tuple;
+    private Tuple outTuple;
 
     private int headerSize = 0; // Header size of a tuple
     private BitArray nullFlags;
@@ -105,7 +105,7 @@ public class RawFile {
         columnTypes[i] = schema.getColumn(i).getDataType();
       }
 
-      tuple = new VTuple(columnTypes.length);
+      outTuple = new VTuple(columnTypes.length);
       nullFlags = new BitArray(schema.size());
       headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 
bytes is for NullFlagSize
 
@@ -274,51 +274,51 @@ public class RawFile {
       for (int i = 0; i < columnTypes.length; i++) {
         // check if the i'th column is null
         if (nullFlags.get(i)) {
-          tuple.put(i, DatumFactory.createNullDatum());
+          outTuple.put(i, DatumFactory.createNullDatum());
           continue;
         }
 
         switch (columnTypes[i].getType()) {
           case BOOLEAN :
-            tuple.put(i, DatumFactory.createBool(buffer.get()));
+            outTuple.put(i, DatumFactory.createBool(buffer.get()));
             break;
 
           case BIT :
-            tuple.put(i, DatumFactory.createBit(buffer.get()));
+            outTuple.put(i, DatumFactory.createBit(buffer.get()));
             break;
 
           case CHAR :
             int realLen = readRawVarint32();
             byte[] buf = new byte[realLen];
             buffer.get(buf);
-            tuple.put(i, DatumFactory.createChar(buf));
+            outTuple.put(i, DatumFactory.createChar(buf));
             break;
 
           case INT2 :
-            tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+            outTuple.put(i, DatumFactory.createInt2(buffer.getShort()));
             break;
 
           case INT4 :
-            tuple.put(i, 
DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
+            outTuple.put(i, 
DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
             break;
 
           case INT8 :
-            tuple.put(i, 
DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
+            outTuple.put(i, 
DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
             break;
 
           case FLOAT4 :
-            tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+            outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
             break;
 
           case FLOAT8 :
-            tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+            outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
             break;
 
           case TEXT : {
             int len = readRawVarint32();
             byte [] strBytes = new byte[len];
             buffer.get(strBytes);
-            tuple.put(i, DatumFactory.createText(strBytes));
+            outTuple.put(i, DatumFactory.createText(strBytes));
             break;
           }
 
@@ -326,7 +326,7 @@ public class RawFile {
             int len = readRawVarint32();
             byte [] rawBytes = new byte[len];
             buffer.get(rawBytes);
-            tuple.put(i, DatumFactory.createBlob(rawBytes));
+            outTuple.put(i, DatumFactory.createBlob(rawBytes));
             break;
           }
 
@@ -338,22 +338,22 @@ public class RawFile {
             ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(columnTypes[i]);
             Message.Builder builder = factory.newBuilder();
             builder.mergeFrom(rawBytes);
-            tuple.put(i, factory.createDatum(builder.build()));
+            outTuple.put(i, factory.createDatum(builder.build()));
             break;
           }
 
           case INET4 :
             byte [] ipv4Bytes = new byte[4];
             buffer.get(ipv4Bytes);
-            tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
+            outTuple.put(i, DatumFactory.createInet4(ipv4Bytes));
             break;
 
           case DATE: {
             int val = buffer.getInt();
             if (val < Integer.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
+              outTuple.put(i, DatumFactory.createNullDatum());
             } else {
-              tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
+              outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], 
val));
             }
             break;
           }
@@ -361,14 +361,14 @@ public class RawFile {
           case TIMESTAMP: {
             long val = buffer.getLong();
             if (val < Long.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
+              outTuple.put(i, DatumFactory.createNullDatum());
             } else {
-              tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
+              outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], 
val));
             }
             break;
           }
           case NULL_TYPE:
-            tuple.put(i, NullDatum.get());
+            outTuple.put(i, NullDatum.get());
             break;
 
           default:
@@ -380,7 +380,7 @@ public class RawFile {
       if(filePosition - buffer.remaining() >= endOffset){
         eos = true;
       }
-      return new VTuple(tuple);
+      return outTuple;
     }
 
     private void reSizeBuffer(int writableBytes){

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 2be2ec0..ef597ea 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -119,6 +119,8 @@ public class RowFile {
         buffer.flip();
       }
 
+      tuple = new VTuple(schema.size());
+
       super.init();
     }
 
@@ -182,7 +184,6 @@ public class RowFile {
       }
 
       int i;
-      tuple = new VTuple(schema.size());
 
       int nullFlagSize = buffer.getShort();
       byte[] nullFlagBytes = new byte[nullFlagSize];

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
index 729c237..217acba 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -54,6 +54,7 @@ public class AvroScanner extends FileScanner {
   private List<Schema.Field> avroFields;
   private DataFileReader<GenericRecord> dataFileReader;
   private int[] projectionMap;
+  private Tuple outTuple;
 
   /**
    * Creates a new AvroScanner.
@@ -78,6 +79,7 @@ public class AvroScanner extends FileScanner {
       targets = schema.toArray();
     }
     prepareProjection(targets);
+    outTuple = new VTuple(projectionMap.length);
 
     avroSchema = AvroUtil.getAvroSchema(meta, conf);
     avroFields = avroSchema.getFields();
@@ -176,13 +178,12 @@ public class AvroScanner extends FileScanner {
       return null;
     }
 
-    Tuple tuple = new VTuple(projectionMap.length);
     GenericRecord record = dataFileReader.next();
     for (int i = 0; i < projectionMap.length; ++i) {
       int columnIndex = projectionMap[i];
       Object value = record.get(columnIndex);
       if (value == null) {
-        tuple.put(i, NullDatum.get());
+        outTuple.put(i, NullDatum.get());
         continue;
       }
 
@@ -197,28 +198,28 @@ public class AvroScanner extends FileScanner {
       TajoDataTypes.Type tajoType = dataType.getType();
       switch (avroType) {
         case NULL:
-          tuple.put(i, NullDatum.get());
+          outTuple.put(i, NullDatum.get());
           break;
         case BOOLEAN:
-          tuple.put(i, DatumFactory.createBool((Boolean)value));
+          outTuple.put(i, DatumFactory.createBool((Boolean) value));
           break;
         case INT:
-          tuple.put(i, convertInt(value, tajoType));
+          outTuple.put(i, convertInt(value, tajoType));
           break;
         case LONG:
-          tuple.put(i, DatumFactory.createInt8((Long)value));
+          outTuple.put(i, DatumFactory.createInt8((Long) value));
           break;
         case FLOAT:
-          tuple.put(i, DatumFactory.createFloat4((Float)value));
+          outTuple.put(i, DatumFactory.createFloat4((Float) value));
           break;
         case DOUBLE:
-          tuple.put(i, DatumFactory.createFloat8((Double)value));
+          outTuple.put(i, DatumFactory.createFloat8((Double) value));
           break;
         case BYTES:
-          tuple.put(i, convertBytes(value, tajoType, dataType));
+          outTuple.put(i, convertBytes(value, tajoType, dataType));
           break;
         case STRING:
-          tuple.put(i, convertString(value, tajoType));
+          outTuple.put(i, convertString(value, tajoType));
           break;
         case RECORD:
           throw new RuntimeException("Avro RECORD not supported.");
@@ -229,13 +230,13 @@ public class AvroScanner extends FileScanner {
         case UNION:
           throw new RuntimeException("Avro UNION not supported.");
         case FIXED:
-          tuple.put(i, new BlobDatum(((GenericFixed)value).bytes()));
+          outTuple.put(i, new BlobDatum(((GenericFixed) value).bytes()));
           break;
         default:
           throw new RuntimeException("Unknown type.");
       }
     }
-    return tuple;
+    return outTuple;
   }
 
   /**
@@ -253,6 +254,7 @@ public class AvroScanner extends FileScanner {
     if (dataFileReader != null) {
       dataFileReader.close();
     }
+    outTuple = null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 81a1ffd..ba81c3e 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -126,15 +126,21 @@ public class BSTIndex implements IndexMethod {
     }
 
     @Override
-    public void write(Tuple key, long offset) throws IOException {
-      if (firstKey == null || compartor.compare(key, firstKey) < 0) {
-        firstKey = key;
+    public void write(final Tuple key, final long offset) throws IOException {
+      Tuple keyTuple;
+      try {
+        keyTuple = key.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e);
+      }
+      if (firstKey == null || compartor.compare(keyTuple, firstKey) < 0) {
+        firstKey = keyTuple;
       }
-      if (lastKey == null || compartor.compare(lastKey, key) < 0) {
-        lastKey = key;
+      if (lastKey == null || compartor.compare(lastKey, keyTuple) < 0) {
+        lastKey = keyTuple;
       }
 
-      collector.put(key, offset);
+      collector.put(keyTuple, offset);
     }
 
     public TupleComparator getComparator() {
@@ -253,7 +259,7 @@ public class BSTIndex implements IndexMethod {
         map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
       }
 
-      public void put(Tuple key, long offset) {
+      public void put(final Tuple key, final long offset) {
         if (map.containsKey(key)) {
           map.get(key).add(offset);
         } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 286ee3a..47d0267 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1182,6 +1182,8 @@ public class RCFile {
     private byte[] nullChars;
     private SerializerDeserializer serde;
 
+    private Tuple outTuple;
+
     public RCFileScanner(Configuration conf, final Schema schema, final 
TableMeta meta,
                          final Fragment fragment) throws IOException {
       super(conf, schema, meta, fragment);
@@ -1214,6 +1216,8 @@ public class RCFile {
         targets = schema.toArray();
       }
 
+      outTuple = new VTuple(targets.length);
+
       targetColumnIndexes = new int[targets.length];
       for (int i = 0; i < targets.length; i++) {
         targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
@@ -1641,9 +1645,8 @@ public class RCFile {
         return null;
       }
 
-      Tuple tuple = new VTuple(targets.length);
-      getCurrentRow(tuple);
-      return tuple;
+      getCurrentRow(outTuple);
+      return outTuple;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index 340e2fa..bac8779 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.sequencefile;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -35,7 +36,10 @@ import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
 
 import java.io.IOException;
 
@@ -73,6 +77,11 @@ public class SequenceFileScanner extends FileScanner {
 
   private Writable EMPTY_KEY;
 
+  private TextLineDeserializer deserializer;
+  private ByteBuf byteBuf = 
BufferPool.directBuffer(ByteBufLineReader.DEFAULT_BUFFER);
+
+  private Tuple outTuple;
+
   public SequenceFileScanner(Configuration conf, Schema schema, TableMeta 
meta, Fragment fragment) throws IOException {
     super(conf, schema, meta, fragment);
   }
@@ -109,6 +118,9 @@ public class SequenceFileScanner extends FileScanner {
       targets = schema.toArray();
     }
 
+    outTuple = new VTuple(targets.length);
+    deserializer = 
DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets);
+    deserializer.init();
 
     fieldIsNull = new boolean[schema.getRootColumns().size()];
     fieldStart = new int[schema.getRootColumns().size()];
@@ -149,6 +161,8 @@ public class SequenceFileScanner extends FileScanner {
     }
   }
 
+  Text text = new Text();
+
   @Override
   public Tuple next() throws IOException {
     if (!more) return null;
@@ -163,24 +177,25 @@ public class SequenceFileScanner extends FileScanner {
     }
 
     if (more) {
-      Tuple tuple = null;
-      byte[][] cells;
-
       if (hasBinarySerDe) {
         BytesWritable bytesWritable = new BytesWritable();
         reader.getCurrentValue(bytesWritable);
-        tuple = makeTuple(bytesWritable);
+        makeTuple(bytesWritable);
         totalBytes += (long)bytesWritable.getBytes().length;
       } else {
-        Text text = new Text();
         reader.getCurrentValue(text);
-        cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), 
-            delimiter, projectionMap, schema.getRootColumns().size());
-        totalBytes += (long)text.getBytes().length;
-        tuple = new LazyTuple(schema, cells, 0, nullChars, serde);
+
+        byteBuf.clear();
+        byteBuf.writeBytes(text.getBytes(), 0, text.getLength());
+
+        try {
+          deserializer.deserialize(byteBuf, outTuple);
+        } catch (TextLineParsingError e) {
+          throw new IOException(e);
+        }
       }
       currentIdx++;
-      return tuple;
+      return outTuple;
     } else {
       return null;
     }
@@ -200,8 +215,6 @@ public class SequenceFileScanner extends FileScanner {
    * So, tajo must make a tuple after parsing hive style BinarySerDe.
    */
   private Tuple makeTuple(BytesWritable value) throws IOException{
-    Tuple tuple = new VTuple(schema.getRootColumns().size());
-
     int start = 0;
     int length = value.getLength();
 
@@ -229,7 +242,7 @@ public class SequenceFileScanner extends FileScanner {
         for (int j = 0; j < projectionMap.length; j++) {
           if (projectionMap[j] == i) {
             Datum datum = serde.deserialize(i, bytes, fieldStart[i], 
fieldLength[i], nullChars);
-            tuple.put(i, datum);
+            outTuple.put(i, datum);
           }
         }
       }
@@ -247,7 +260,7 @@ public class SequenceFileScanner extends FileScanner {
       }
     }
 
-    return tuple;
+    return outTuple;
   }
 
   /**
@@ -321,11 +334,16 @@ public class SequenceFileScanner extends FileScanner {
       tableStats.setReadBytes(totalBytes);
       tableStats.setNumRows(currentIdx);
     }
+
+    outTuple = null;
+    if (this.byteBuf.refCnt() > 0) {
+      this.byteBuf.release();
+    }
   }
 
   @Override
   public boolean isProjectable() {
-    return false;
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index e23e8f8..3c667bf 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ByteBufLineReader implements Closeable {
-  private static int DEFAULT_BUFFER = 64 * 1024;
+  public static int DEFAULT_BUFFER = 64 * 1024;
 
   private int bufferSize;
   private long readBytes;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
index 21984f2..3445676 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage.text;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index fdeba4e..c68690a 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -286,6 +286,8 @@ public class DelimitedTextFile {
     /** How many errors have occurred? */
     private int errorNum;
 
+    private VTuple outTuple;
+
     public DelimitedTextFileScanner(Configuration conf, final Schema schema, 
final TableMeta meta,
                                     final Fragment fragment)
         throws IOException {
@@ -321,6 +323,8 @@ public class DelimitedTextFile {
         targets = schema.toArray();
       }
 
+      outTuple = new VTuple(targets.length);
+
       super.init();
       if (LOG.isDebugEnabled()) {
         LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," 
+ startOffset + "," + endOffset);
@@ -375,7 +379,6 @@ public class DelimitedTextFile {
 
     @Override
     public Tuple next() throws IOException {
-      VTuple tuple;
 
       if (!reader.isReadable()) {
         return null;
@@ -400,11 +403,10 @@ public class DelimitedTextFile {
             return EmptyTuple.get();
           }
 
-          tuple = new VTuple(targets.length);
-          tuple.setOffset(offset);
+          outTuple.setOffset(offset);
 
           try {
-            deserializer.deserialize(buf, tuple);
+            deserializer.deserialize(buf, outTuple);
             // if a line is read normally, it exits this loop.
             break;
 
@@ -429,7 +431,7 @@ public class DelimitedTextFile {
         // recordCount means the number of actual read records. We increment 
the count here.
         recordCount++;
 
-        return tuple;
+        return outTuple;
 
       } catch (Throwable t) {
         LOG.error(t);
@@ -459,6 +461,7 @@ public class DelimitedTextFile {
       } finally {
         IOUtils.cleanup(LOG, reader);
         reader = null;
+        outTuple = null;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index c09a83b..4cdf7df 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -42,7 +42,12 @@ public abstract class TextLineSerDe {
   public abstract TextLineSerializer createSerializer(Schema schema, TableMeta 
meta);
 
   public static ByteBuf getNullChars(TableMeta meta) {
-    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
+    byte[] nullCharByteArray;
+    if (meta.getStoreType().equals("SEQUENCEFILE")) {
+      nullCharByteArray = getNullCharsAsBytes(meta, 
StorageConstants.SEQUENCEFILE_NULL, "\\");
+    } else {
+      nullCharByteArray = getNullCharsAsBytes(meta);
+    }
 
     ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, 
nullCharByteArray.length);
     nullChars.writeBytes(nullCharByteArray);
@@ -51,10 +56,13 @@ public abstract class TextLineSerDe {
   }
 
   public static byte [] getNullCharsAsBytes(TableMeta meta) {
+    return getNullCharsAsBytes(meta, StorageConstants.TEXT_NULL, 
NullDatum.DEFAULT_TEXT);
+  }
+
+  public static byte[] getNullCharsAsBytes(TableMeta meta, String key, String 
defaultVal) {
     byte [] nullChars;
 
-    String nullCharacters = 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
-        NullDatum.DEFAULT_TEXT));
+    String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(key, 
defaultVal));
     if (StringUtils.isEmpty(nullCharacters)) {
       nullChars = NullDatum.get().asTextBytes();
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 35a5ea4..0ba75dd 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -109,8 +109,7 @@ public class TestMergeScanner {
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
     if (storeType.equalsIgnoreCase("AVRO")) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_MULTIPLE_FILES_AVRO_SCHEMA);
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, 
TEST_MULTIPLE_FILES_AVRO_SCHEMA);
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
@@ -201,6 +200,8 @@ public class TestMergeScanner {
   private static boolean isProjectableStorage(String type) {
     if (type.equalsIgnoreCase("RCFILE") ||
         type.equalsIgnoreCase("PARQUET") ||
+        type.equalsIgnoreCase("CSV") ||
+        type.equalsIgnoreCase("SEQUENCEFILE") ||
         type.equalsIgnoreCase("AVRO")) {
       return true;
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index b53dbec..c3f31a0 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -369,6 +369,7 @@ public class TestStorages {
         DatumFactory.createInet4("192.168.0.1"),
         NullDatum.get()
     });
+
     if (handleProtobuf) {
       tuple.put(11, factory.createDatum(queryid.getProto()));
     }
@@ -420,8 +421,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_SERDE, 
TextSerializerDeserializer.class.getName());
     meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
     if (storeType.equalsIgnoreCase("AVRO")) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, 
TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");

Reply via email to