Repository: tajo
Updated Branches:
  refs/heads/master 45100ced2 -> 9afd9abe3


http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
new file mode 100644
index 0000000..e804941
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RadixSort.java
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.type.TajoTypeUtil;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.UnSafeTupleList;
+import org.apache.tajo.util.SizeOf;
+import sun.misc.Contended;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ *
+ * Radix sort implementation (https://en.wikipedia.org/wiki/Radix_sort).
+ * This implementation uses the hybrid approach which consists of MSD radix 
sort and Tim sort.
+ *
+ * It first organizes the given tuples into several bins which have the same 
radix key. For each bin, it groups
+ * it again using MSD radix sort if the length of the bin is sufficiently 
large. Otherwise, it simply sorts that bin
+ * using Tim sort.
+ *
+ */
+public class RadixSort {
+
+  private static final Log LOG = LogFactory.getLog(RadixSort.class);
+
+  private static class RadixSortContext {
+    @Contended UnSafeTuple[] in;
+    @Contended UnSafeTuple[] out;
+    @Contended final int[] keys;
+
+    final int[] sortKeyIds;
+    final int maxSortKeyId;
+    final Type[] sortKeyTypes;
+    final boolean[] asc;
+    final boolean[] nullFirst;
+    final Comparator<UnSafeTuple> comparator;
+
+    // If the number of tuples to be sorted does not exceed this value, Tim 
sort is used.
+    // The default value is 65536 which is got from some experiments.
+    final int timSortThreshold;
+
+    long msdRadixSortTime = 0;
+    long histogramPrepareTime = 0;
+    long swapTime = 0;
+    long histogramBuildTime = 0;
+    int msdRadixSortCall = 0;
+
+    public RadixSortContext(UnSafeTuple[] in, Schema schema, SortSpec[] 
sortSpecs, Comparator<UnSafeTuple> comparator,
+                            int timSortThreshold) {
+      this.in = in;
+      this.out = new UnSafeTuple[in.length];
+      this.keys = new int[in.length];
+      this.maxSortKeyId = sortSpecs.length - 1;
+      this.sortKeyIds = new int[sortSpecs.length];
+      sortKeyTypes = new Type[sortSpecs.length];
+      asc = new boolean[sortSpecs.length];
+      nullFirst = new boolean[sortSpecs.length];
+      for (int i = 0; i < sortSpecs.length; i++) {
+        if (sortSpecs[i].getSortKey().hasQualifier()) {
+          this.sortKeyIds[i] = 
schema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
+        } else {
+          this.sortKeyIds[i] = 
schema.getColumnIdByName(sortSpecs[i].getSortKey().getSimpleName());
+        }
+        this.asc[i] = sortSpecs[i].isAscending();
+        this.nullFirst[i] = sortSpecs[i].isNullsFirst();
+        this.sortKeyTypes[i] = 
sortSpecs[i].getSortKey().getDataType().getType();
+      }
+      this.comparator = comparator;
+      this.timSortThreshold = timSortThreshold;
+    }
+
+    public void printStat() {
+      LOG.info("- msdRadixSortTime: " + msdRadixSortTime + " ms");
+      LOG.info("\t|- histogramPrepareTime: " + histogramPrepareTime + " ms");
+      LOG.info("\t\t|- histogramBuildTime: " + histogramBuildTime + " ms");
+      LOG.info("\t|- swapTime: " + swapTime + " ms");
+      LOG.info("- msdRadixSortCall: " + msdRadixSortCall + " times");
+    }
+  }
+
+  /**
+   * Entry method.
+   *
+   * @param list
+   * @param schema input schema
+   * @param sortSpecs sort specs
+   * @param comp comparator for Tim sort
+   * @return a sorted list of tuples
+   */
+  public static List<UnSafeTuple> sort(QueryContext queryContext, 
UnSafeTupleList list, Schema schema, SortSpec[] sortSpecs,
+                                       Comparator<UnSafeTuple> comp) {
+    UnSafeTuple[] in = list.toArray(new UnSafeTuple[list.size()]);
+    RadixSortContext context = new RadixSortContext(in, schema, sortSpecs, 
comp,
+        
queryContext.getInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT));
+
+    long before = System.currentTimeMillis();
+    recursiveCallForNextKey(context, 0, context.in.length, 0);
+    context.msdRadixSortTime += System.currentTimeMillis() - before;
+    context.printStat();
+    ListIterator<UnSafeTuple> it = list.listIterator();
+    for (UnSafeTuple t : context.in) {
+      it.next();
+      it.set(t);
+    }
+    return list;
+  }
+
+  static void recursiveCallForNextKey(RadixSortContext context, int start, int 
exclusiveEnd, int curSortKeyIdx) {
+    if (needConsiderSign(context.sortKeyTypes[curSortKeyIdx])) {
+      if (TajoTypeUtil.isReal(context.sortKeyTypes[curSortKeyIdx])) {
+        msdTernaryRadixSort(context, start, exclusiveEnd, curSortKeyIdx, 
context.asc[curSortKeyIdx],
+            calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]));
+      } else {
+        msdRadixSort(context, start, exclusiveEnd, curSortKeyIdx, 
context.asc[curSortKeyIdx],
+            calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]), true);
+      }
+    } else {
+      msdRadixSort(context, start, exclusiveEnd, curSortKeyIdx, 
context.asc[curSortKeyIdx],
+          calculateInitialPass(context.sortKeyTypes[curSortKeyIdx]), false);
+    }
+  }
+
+  static boolean needConsiderSign(Type type) {
+    switch (type) {
+      case INT2:
+      case INT4:
+      case INT8:
+      case TIME:
+      case TIMESTAMP:
+      case FLOAT4:
+      case FLOAT8:
+        return true;
+      case DATE:
+      case INET4:
+        return false;
+      default:
+        throw new TajoInternalError(new UnsupportedException(type.name()));
+    }
+  }
+
+  private static int getFieldOffset(long address, int fieldId) {
+    return PlatformDependent.getInt(address + (long)(SizeOf.SIZE_OF_INT + 
(fieldId * SizeOf.SIZE_OF_INT)));
+  }
+
+  private static long getFieldAddr(long address, int fieldId) {
+    return address + getFieldOffset(address, fieldId);
+  }
+
+  /**
+   * Get a radix key from a column values of the given tuple.
+   * The sign of the column value should be considered.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullLastSignConsidered16RadixKey(UnSafeTuple tuple, int 
sortKeyId, int pass) {
+    int key = _16BIT_NULL_LAST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      // For negative values, the key should be 1 ~ 32768. For positive 
values, the key should be 32769 ~ 65536.
+      key = PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) + _16BIT_SECOND_HALF_START_IDX;
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The sign of the column value should be considered.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullFirstSignConsidered16RadixKey(UnSafeTuple tuple, int 
sortKeyId, int pass) {
+    int key = _16BIT_NULL_FIRST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      // For negative values, the key should be 1 ~ 32768. For positive 
values, the key should be 32769 ~ 65536.
+      key = PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) + _16BIT_SECOND_HALF_START_IDX;
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The sign of the column value should be considered.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullLastSignConsidered16RadixKey(UnSafeTuple tuple, int 
sortKeyId, int pass) {
+    int key = _16BIT_NULL_LAST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      // For positive values, the key should be 1 ~ 32768. For negative 
values, the key should be 32769 ~ 65536.
+      key = _16BIT_FIRST_HALF_END_IDX - 
PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass));
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The sign of the column value should be considered.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullFirstSignConsidered16RadixKey(UnSafeTuple tuple, int 
sortKeyId, int pass) {
+    int key = _16BIT_NULL_FIRST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      // For positive values, the key should be 1 ~ 32768. For negative 
values, the key should be 32769 ~ 65536.
+      key = _16BIT_FIRST_HALF_END_IDX - 
PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass));
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The return key is an unsigned short value.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullLast16RadixKey(UnSafeTuple tuple, int sortKeyId, int pass) 
{
+    int key = _16BIT_NULL_LAST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 1 + (PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The return key is an unsigned short value.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullFirst16RadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = _16BIT_NULL_FIRST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 1 + (PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & SHORT_UNSIGNED_MASK);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The return key is an unsigned short value.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullLast16RadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = _16BIT_NULL_LAST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = _16BIT_MAX_BIN_IDX - 
(PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) 
& SHORT_UNSIGNED_MASK);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 16-bit radix key from a column values of the given tuple.
+   * The return key is an unsigned short value.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullFirst16RadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = _16BIT_NULL_FIRST_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = _16BIT_MAX_BIN_IDX - 
(PlatformDependent.getShort(getFieldAddr(tuple.address(), sortKeyId) + (pass)) 
& SHORT_UNSIGNED_MASK);
+    }
+    return key;
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16AscNullLastSignConsideredHistogram(RadixSortContext 
context, int start, int exclusiveEnd,
+                                                          int curSortKeyIdx, 
int pass, int[] positions, int[] keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullLastSignConsidered16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16AscNullFirstSignConsideredHistogram(RadixSortContext 
context, int start, int exclusiveEnd,
+                                                           int curSortKeyIdx, 
int pass, int[] positions, int[] keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullFirstSignConsidered16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16DescNullLastSignConsideredHistogram(RadixSortContext 
context, int start, int exclusiveEnd,
+                                                           int curSortKeyIdx, 
int pass, int[] positions, int[] keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullLastSignConsidered16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16DescNullFirstSignConsideredHistogram(RadixSortContext 
context, int start, int exclusiveEnd,
+                                                            int curSortKeyIdx, 
int pass, int[] positions, int[] keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullFirstSignConsidered16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16AscNullLastHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                            int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullLast16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16AscNullFirstHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                             int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullFirst16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16DescNullLastHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                             int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullLast16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare16DescNullFirstHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                              int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullFirst16RadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  static void buildHistogram(RadixSortContext context, int start, int[] 
positions) {
+    long before = System.currentTimeMillis();
+    positions[0] += start;
+    for (int i = 0; i < positions.length - 1; i++) {
+      positions[i + 1] += positions[i];
+    }
+    context.histogramBuildTime += System.currentTimeMillis() - before;
+  }
+
+  private final static int _16BIT_BIN_NUM = 65538;
+  private final static int _16BIT_NULL_FIRST_IDX = 0;
+  private final static int _16BIT_NULL_LAST_IDX = 65537;
+  private final static int _16BIT_MAX_BIN_IDX = 65536;
+  private final static int _16BIT_FIRST_HALF_END_IDX = 32768;
+  private final static int _16BIT_SECOND_HALF_START_IDX = 32769;
+  private final static int SHORT_UNSIGNED_MASK = 0xFFFF;
+
+  /**
+   * Sort the specified part of the input tuples.
+   * If the length of the part is sufficiently large, recursively call 
msdRadixSort(). Otherwise, call Arrays.sort().
+   *
+   * @param context radix sort context
+   * @param start start position of the part will be sorted.
+   * @param exclusiveEnd end position of the part will be sorted.
+   * @param curSortKeyIdx current sort key index
+   * @param asc ascending flag
+   * @param pass current pass
+   * @param considerSign a flag to represent that the sign must be considered
+   */
+  static void msdRadixSort(RadixSortContext context, int start, int 
exclusiveEnd, int curSortKeyIdx, boolean asc,
+                           int pass, boolean considerSign) {
+    context.msdRadixSortCall++;
+
+    // This array contains the end positions of bins. For example, suppose an 
input which consists of nulls and numbers
+    // of 1 ~ 9. The number of each numbers and null is 10. If this input is 
organized into 12 bins which have the equal
+    // length of 10, this array will contain the below values.
+    //
+    // Ex) asc, null first
+    //
+    //   [0]  [1]  [2]  [3]  [4]  [5]  [6]  [7]  [8]  [9]  [10]  [11] // 0th 
and 11th bins are reserved for null values
+    //   10   20   30   40   50   60   70   80   90   100   100   100
+    //
+    // If the considerSign flag is set, this array is used for both negative 
and positive values.
+    // The positions of both values are determined by the asc flag.
+    // When the asc flag is set, the first half of the array is used for 
negative values.
+    // Otherwise, the second half of the array is used for negative values.
+    //
+    // Note: too many recursive calls to msdRadixSort() can incur a lot of 
memory overhead because this array should be
+    // always newly created when it is called.
+    final int[] binEndIdx = new int[_16BIT_BIN_NUM];
+
+    // An array to cache radix keys which are gotten while building the 
histogram.
+    // Since getting keys is the most expensive part of this implementation, 
keys should be cached once they are gotten.
+    final int[] keys = context.keys;
+
+    // TODO: consider the current key type
+    long before = System.currentTimeMillis();
+    // Build a histogram.
+    // Call different methods depending on the sort spec of the current key. 
This is to avoid frequent branch
+    // mispredictions. This is effective because the below code block is the 
most expensive part of this implementation.
+    // TODO: code generation can simplify the below codes.
+    if (considerSign) {
+      if (asc) {
+        if (context.nullFirst[curSortKeyIdx]) {
+          prepare16AscNullFirstSignConsideredHistogram(context, start, 
exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+              keys);
+        } else {
+          prepare16AscNullLastSignConsideredHistogram(context, start, 
exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+              keys);
+        }
+      } else {
+        if (context.nullFirst[curSortKeyIdx]) {
+          prepare16DescNullFirstSignConsideredHistogram(context, start, 
exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+              keys);
+        } else {
+          prepare16DescNullLastSignConsideredHistogram(context, start, 
exclusiveEnd, curSortKeyIdx, pass, binEndIdx,
+              keys);
+        }
+      }
+    } else {
+      if (asc) {
+        if (context.nullFirst[curSortKeyIdx]) {
+          prepare16AscNullFirstHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+        } else {
+          prepare16AscNullLastHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+        }
+      } else {
+        if (context.nullFirst[curSortKeyIdx]) {
+          prepare16DescNullFirstHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+        } else {
+          prepare16DescNullLastHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+        }
+      }
+    }
+    context.histogramPrepareTime += System.currentTimeMillis() - before;
+
+    // Swap tuples if necessary.
+    // If every tuple has the same radix key, tuples don't have to be swapped.
+
+    boolean needSwap = Arrays.stream(binEndIdx).filter(eachCount -> eachCount 
> 0).count() > 1;
+    buildHistogram(context, start, binEndIdx);
+    if (needSwap) {
+      before = System.currentTimeMillis();
+      final int[] binNextElemIdx = new int[_16BIT_BIN_NUM];
+      System.arraycopy(binEndIdx, 0, binNextElemIdx, 0, _16BIT_BIN_NUM);
+      for (int i = start; i < exclusiveEnd; i++) {
+        context.out[--binNextElemIdx[keys[i]]] = context.in[i];
+      }
+      System.arraycopy(context.out, start, context.in, start, exclusiveEnd - 
start);
+      context.swapTime += System.currentTimeMillis() - before;
+    }
+
+    // Recursive call radix sort if necessary.
+    if (pass > 0 || curSortKeyIdx < context.maxSortKeyId) {
+      boolean nextKey = pass == 0;
+      int len = binEndIdx[0] - start;
+
+      if (len > 1) {
+        // Use the tim sort when the array length is sufficiently small.
+        if (len < context.timSortThreshold) {
+          Arrays.sort(context.in, start, binEndIdx[0], context.comparator);
+        } else {
+          if (nextKey) {
+            recursiveCallForNextKey(context, start, binEndIdx[0], 
curSortKeyIdx + 1);
+          } else {
+            msdRadixSort(context, start, binEndIdx[0], curSortKeyIdx, asc, 
pass - 2, false);
+          }
+        }
+      }
+
+      for (int i = 0; i < _16BIT_MAX_BIN_IDX && binEndIdx[i] < exclusiveEnd; 
i++) {
+        len = binEndIdx[i + 1] - binEndIdx[i];
+        if (len > 1) {
+          // Use the tim sort when the array length is sufficiently small.
+          if (len < context.timSortThreshold) {
+            Arrays.sort(context.in, binEndIdx[i], binEndIdx[i + 1], 
context.comparator);
+          } else {
+            if (nextKey) {
+              recursiveCallForNextKey(context, binEndIdx[i], binEndIdx[i + 1], 
curSortKeyIdx + 1);
+            } else {
+              msdRadixSort(context, binEndIdx[i], binEndIdx[i + 1], 
curSortKeyIdx, asc, pass - 2, false);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // Below methods are only used for floating point types.
+
+  /**
+   * Get a 1-bit radix key from a column values of the given tuple.
+   * The keys of 0 and 3 are reserved for null values.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullFirst1bRadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = 0; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 2 - ((PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & 0xFFFF) >> 15);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 1-bit radix key from a column values of the given tuple.
+   * The keys of 0 and 3 are reserved for null values.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int ascNullLast1bRadixKey(UnSafeTuple tuple, int sortKeyId, int pass) 
{
+    int key = _1BIT_BIN_MAX_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 2 - ((PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & 0xFFFF) >> 15);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 1-bit radix key from a column values of the given tuple.
+   * The keys of 0 and 3 are reserved for null values.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullFirst1bRadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = 0; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 1 + ((PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & 0xFFFF) >> 15);
+    }
+    return key;
+  }
+
+  /**
+   * Get a 1-bit radix key from a column values of the given tuple.
+   * The keys of 0 and 3 are reserved for null values.
+   *
+   * @param tuple
+   * @param sortKeyId
+   * @param pass
+   * @return
+   */
+  static int descNullLast1bRadixKey(UnSafeTuple tuple, int sortKeyId, int 
pass) {
+    int key = _1BIT_BIN_MAX_IDX; // for null
+    if (!tuple.isBlankOrNull(sortKeyId)) {
+      key = 1 + ((PlatformDependent.getShort(getFieldAddr(tuple.address(), 
sortKeyId) + (pass)) & 0xFFFF) >> 15);
+    }
+    return key;
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare1bAscNullFirstHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                             int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullFirst1bRadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare1bAscNullLastHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                            int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = ascNullLast1bRadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare1bDescNullFirstHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                              int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullFirst1bRadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  /**
+   * Calculate positions of the input tuples.
+   *
+   * @param context
+   * @param start
+   * @param exclusiveEnd
+   * @param curSortKeyIdx
+   * @param pass
+   * @param positions
+   * @param keys
+   */
+  static void prepare1bDescNullLastHistogram(RadixSortContext context, int 
start, int exclusiveEnd, int curSortKeyIdx,
+                                             int pass, int[] positions, int[] 
keys) {
+    for (int i = start; i < exclusiveEnd; i++) {
+      keys[i] = descNullLast1bRadixKey(context.in[i], 
context.sortKeyIds[curSortKeyIdx], pass);
+      positions[keys[i]] += 1;
+    }
+  }
+
+  private final static int _1BIT_BIN_NUM = 4;
+  private final static int _1BIT_BIN_MAX_IDX = 3;
+
+  /**
+   * Sort the specified part of the input tuples when the current sort key has 
a floating point type.
+   * This method is called only once at the first pass.
+   *
+   * @param context radix sort context
+   * @param start start position of the part will be sorted
+   * @param exclusiveEnd end position of the part will be sorted
+   * @param curSortKeyIdx current sort key index
+   * @param asc ascending flag
+   * @param pass current pass
+   */
+  static void msdTernaryRadixSort(RadixSortContext context, int start, int 
exclusiveEnd, int curSortKeyIdx, boolean asc,
+                                  int pass) {
+    context.msdRadixSortCall++;
+
+    // The values of floating point types are organized into three groups, 
i.e., positives, negatives, and nulls.
+    // The positions for these groups are stored in an integer array of length 
4.
+    // The first and last slots are reserved for null values.
+    // The second and third slots are used for positives and negatives 
depending on the ascending order specification.
+    // If the ascending order is specified, negatives come first. Otherwise, 
positives come first.
+    // Ex) asc, null first
+    //
+    // [ nulls ] [ negatives ] [ positives ] [ empty ]
+    //
+    final int[] binEndIdx = new int[_1BIT_BIN_NUM];
+
+    // An array to cache radix keys which are gotten while building the 
histogram.
+    // Since getting keys is the most expensive part of this implementation, 
keys should be cached once they are gotten.
+    final int[] keys = context.keys;
+
+    // TODO: consider the current key type
+    long before = System.currentTimeMillis();
+    // Build a histogram.
+    // Call different methods depending on the sort spec of the current key. 
This is to avoid frequent branch
+    // mispredictions. This is effective because the below code block is the 
most expensive part of this implementation.
+    // TODO: code generation can simplify the below codes.
+    if (asc) {
+      if (context.nullFirst[curSortKeyIdx]) {
+        prepare1bAscNullFirstHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+      } else {
+        prepare1bAscNullLastHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+      }
+    } else {
+      if (context.nullFirst[curSortKeyIdx]) {
+        prepare1bDescNullFirstHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+      } else {
+        prepare1bDescNullLastHistogram(context, start, exclusiveEnd, 
curSortKeyIdx, pass, binEndIdx, keys);
+      }
+    }
+
+    context.histogramPrepareTime += System.currentTimeMillis() - before;
+
+    // Swap tuples if necessary.
+    // If every tuple has the same radix key, tuples don't have to be swapped.
+    boolean needSwap = Arrays.stream(binEndIdx).filter(eachCount -> eachCount 
> 0).count() > 1;
+    buildHistogram(context, start, binEndIdx);
+    if (needSwap) {
+      before = System.currentTimeMillis();
+      final int[] binNextElemIdx = new int[_1BIT_BIN_NUM];
+      System.arraycopy(binEndIdx, 0, binNextElemIdx, 0, _1BIT_BIN_NUM);
+      for (int i = start; i < exclusiveEnd; i++) {
+        context.out[--binNextElemIdx[keys[i]]] = context.in[i];
+      }
+      System.arraycopy(context.out, start, context.in, start, exclusiveEnd - 
start);
+      context.swapTime += System.currentTimeMillis() - before;
+    }
+
+    // Recursively call radix sort
+    if (context.nullFirst[curSortKeyIdx]) {
+      // The bin with null values doesn't have to be sorted anymore. As a 
result, call sort for the next key directly.
+      if (curSortKeyIdx < context.maxSortKeyId) {
+        recursiveCallForNextKey(context, start, binEndIdx[0], curSortKeyIdx + 
1);
+      }
+    } else {
+      // The bin with null values doesn't have to be sorted anymore. As a 
result, call sort for the next key directly.
+      if (curSortKeyIdx < context.maxSortKeyId) {
+        recursiveCallForNextKey(context, binEndIdx[2], binEndIdx[3], 
curSortKeyIdx + 1);
+      }
+    }
+
+    int len = binEndIdx[1] - binEndIdx[0];
+
+    if (len > 1) {
+      // Use the tim sort when the array length is sufficiently small.
+      if (len < context.timSortThreshold) {
+        Arrays.sort(context.in, binEndIdx[0], binEndIdx[1], 
context.comparator);
+      } else {
+        msdRadixSort(context, binEndIdx[0], binEndIdx[1], curSortKeyIdx, 
false, pass, false);
+      }
+    }
+
+    len = binEndIdx[2] - binEndIdx[1];
+
+    if (len > 1) {
+      // Use the tim sort when the array length is sufficiently small.
+      if (len < context.timSortThreshold) {
+        Arrays.sort(context.in, binEndIdx[1], binEndIdx[2], 
context.comparator);
+      } else {
+        msdRadixSort(context, binEndIdx[1], binEndIdx[2], curSortKeyIdx, true, 
pass, false);
+      }
+    }
+  }
+
+  static int calculateInitialPass(Type type) {
+    int initialPass = typeByteSize(type) - 2;
+    return initialPass < 0 ? 0 : initialPass;
+  }
+
+  static int typeByteSize(Type type) {
+    switch (type) {
+      case INT2:
+        return 2;
+      case INT4:
+      case FLOAT4:
+        return 4;
+      case INT8:
+      case FLOAT8:
+        return 8;
+      case INET4:
+        return 4;
+      case DATE:
+        return 4;
+      case TIME:
+        return 8;
+      case TIMESTAMP:
+        return 8;
+      default:
+        throw new TajoRuntimeException(new UnsupportedException(type.name()));
+    }
+  }
+
+  /**
+   * Returns whether this implementation supports the given type or not.
+   *
+   * @param sortSpec
+   * @return
+   */
+  public static boolean isApplicableType(SortSpec sortSpec) {
+    switch (sortSpec.getSortKey().getDataType().getType()) {
+      // Variable length types are not supported.
+      case CHAR:
+      case TEXT:
+      case BLOB:
+        return false;
+      // 1 byte types are not supported.
+      case BOOLEAN:
+      case BIT:
+        return false;
+      default:
+        return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index d627064..4d93017 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -449,7 +449,7 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
           aTuple.put(fieldId, 
DatumFactory.createInt4(aNodeStatus.getNumRunningQueryMaster()));
         } else if 
("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
           if (aNodeStatus.getLastHeartbeatTime() > 0) {
-            aTuple.put(fieldId, 
DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
+            aTuple.put(fieldId, 
DatumFactory.createTimestampDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
           } else {
             aTuple.put(fieldId, DatumFactory.createNullDatum());
           }
@@ -503,7 +503,7 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
           aTuple.put(fieldId, 
DatumFactory.createInt4(aNodeStatus.getNumRunningTasks()));
         } else if 
("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
           if (aNodeStatus.getLastHeartbeatTime() > 0) {
-            aTuple.put(fieldId, 
DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
+            aTuple.put(fieldId, 
DatumFactory.createTimestampDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
           } else {
             aTuple.put(fieldId, DatumFactory.createNullDatum());
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 08ff184..fed75cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -875,8 +875,8 @@ public class Stage implements EventHandler<StageEvent> {
                                   new StageEvent(stage.getId(), 
StageEventType.SQ_STAGE_COMPLETED));
                             } else {
                               if(stage.getSynchronizedState() == 
StageState.INITED) {
-                                stage.taskScheduler.start();
                                 stage.eventHandler.handle(new 
StageEvent(stage.getId(), StageEventType.SQ_START));
+                                stage.taskScheduler.start();
                               } else {
                                 /* all tasks are killed before stage are 
inited */
                                 if (stage.getTotalScheduledObjectsCount() == 
stage.getCompletedTaskCount()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
index a31ec5a..e856ac0 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SortNode.java
@@ -18,20 +18,20 @@
 
 package org.apache.tajo.plan.logical;
 
-import java.util.Arrays;
-
 import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
-
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.TUtil;
 
+import java.util.Arrays;
+
 public final class SortNode extends UnaryNode implements Cloneable {
   public enum SortPurpose {
     NORMAL,
     STORAGE_SPECIFIED
   }
+
        @Expose private SortSpec [] sortKeys;
   @Expose private SortPurpose sortPurpose;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 7d2c649..ee1317f 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -402,7 +402,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
-          <version>3.5</version>
+          <version>3.5.1</version>
           <configuration>
             <source>1.8</source>
             <target>1.8</target>
@@ -494,12 +494,12 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-report-plugin</artifactId>
-          <version>2.19</version>
+          <version>2.19.1</version>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
-          <version>2.19</version>
+          <version>2.19.1</version>
           <configuration>
             <trimStackTrace>false</trimStackTrace>
           </configuration>
@@ -706,7 +706,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-report-plugin</artifactId>
-        <version>2.15</version>
       </plugin>
 
       <plugin>
@@ -1338,7 +1337,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-report-plugin</artifactId>
-        <version>2.19</version>
         <configuration>
           <aggregate>true</aggregate>
         </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 20a5d5c..442519b 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -21,7 +21,6 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
 import sun.nio.ch.DirectBuffer;
 
 import java.io.DataInput;
@@ -32,43 +31,6 @@ import java.nio.ByteBuffer;
 
 public class StorageUtil extends StorageConstants {
 
-  public static int getColByteSize(Column col) {
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        return 1;
-      case CHAR:
-        return 1;
-      case BIT:
-        return 1;
-      case INT2:
-        return 2;
-      case INT4:
-        return 4;
-      case INT8:
-        return 8;
-      case FLOAT4:
-        return 4;
-      case FLOAT8:
-        return 8;
-      case INET4:
-        return 4;
-      case INET6:
-        return 32;
-      case TEXT:
-        return 256;
-      case BLOB:
-        return 256;
-      case DATE:
-        return 4;
-      case TIME:
-        return 8;
-      case TIMESTAMP:
-        return 8;
-      default:
-        return 0;
-    }
-  }
-
   public static Path concatPath(String parent, String...childs) {
     return concatPath(new Path(parent), childs);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 64316d1..2e88398 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -965,7 +965,7 @@ public class TestStorages {
 
       VTuple tuple = new VTuple(index - 1);
       index = 0;
-      tuple.put(index++, 
DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 
1000)));
+      tuple.put(index++, 
DatumFactory.createTimestampDatumWithUnixTime((int)(System.currentTimeMillis() 
/ 1000)));
       if (dateTypeSupport()) {
         tuple.put(index++, DatumFactory.createDate("1980-04-01"));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
index 82c3be3..d7e1c3b 100644
--- 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -235,7 +235,7 @@ public abstract class JdbcScanner implements Scanner {
           break;
         case TIMESTAMP:
           tuple.put(column_idx,
-              
DatumFactory.createTimestmpDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
+              
DatumFactory.createTimestampDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
           break;
         case BINARY:
         case VARBINARY:

Reply via email to