http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
new file mode 100644
index 0000000..178af47
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -0,0 +1,1143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.math.BigInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestUniformRangePartition {
+
+  @Test
+  public void testPartitionForINT2Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT2);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt2((short) 1));
+    e.put(0, DatumFactory.createInt2((short) 30000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT2Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT2);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt2((short) 30000));
+    e.put(0, DatumFactory.createInt2((short) 1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT4Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt4(1));
+    e.put(0, DatumFactory.createInt4(10000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT4Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt4(10000));
+    e.put(0, DatumFactory.createInt4(1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT8Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt8(1));
+    e.put(0, DatumFactory.createInt8(10000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForInt8Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt8(10000));
+    e.put(0, DatumFactory.createInt8(1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat4Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat4((float) 1.0));
+    e.put(0, DatumFactory.createFloat4((float) 10000.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat4Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat4((float) 10000.0));
+    e.put(0, DatumFactory.createFloat4((float) 1.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat8Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat8(1.0));
+    e.put(0, DatumFactory.createFloat8(10000.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat8Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat8((float) 10000.0));
+    e.put(0, DatumFactory.createFloat8((float) 1.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  /**
+   * It verify overflow and increment in normal case.
+   */
+  @Test
+  public void testIncrementOfText() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("D"));
+    e.put(1, DatumFactory.createText("C"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
+
+    String [] result = new String[12];
+    result[0] = "AA";
+    result[1] = "AB";
+    result[2] = "AC";
+    result[3] = "BA";
+    result[4] = "BB";
+    result[5] = "BC";
+    result[6] = "CA";
+    result[7] = "CB";
+    result[8] = "CC";
+    result[9] = "DA";
+    result[10] = "DB";
+    result[11] = "DC";
+
+    Tuple end = partitioner.increment(s, BigInteger.valueOf(1), 1);
+    assertEquals("A", end.getText(0));
+    assertEquals("B", end.getText(1));
+    for (int i = 2; i < 11; i++ ) {
+      end = partitioner.increment(end, BigInteger.valueOf(1), 1);
+      assertEquals(result[i].charAt(0), end.getText(0).charAt(0));
+      assertEquals(result[i].charAt(1), end.getText(1).charAt(0));
+    }
+  }
+
+  /**
+   * It verify overflow with the number that exceeds the last digit.
+   */
+  @Test
+  public void testIncrementOfText2() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("D"));
+    e.put(1, DatumFactory.createText("C"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(12, partitioner.getTotalCardinality().intValue());
+
+    String [] result = new String[12];
+    result[0] = "AA";
+    result[1] = "AB";
+    result[2] = "AC";
+    result[3] = "BA";
+    result[4] = "BB";
+    result[5] = "BC";
+    result[6] = "CA";
+    result[7] = "CB";
+    result[8] = "CC";
+    result[9] = "DA";
+    result[10] = "DB";
+    result[11] = "DC";
+
+    Tuple end = partitioner.increment(s, BigInteger.valueOf(6), 1);
+    assertEquals("C", end.getText(0));
+    assertEquals("A", end.getText(1));
+    end = partitioner.increment(end, BigInteger.valueOf(5), 1);
+    assertEquals("D", end.getText(0));
+    assertEquals("C", end.getText(1));
+  }
+
+  /**
+   * It verify the case where two or more digits are overflow.
+   */
+  @Test
+  public void testIncrementOfText3() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT)
+        .addColumn("final", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(3);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("A"));
+    s.put(2, DatumFactory.createText("A"));
+    VTuple e = new VTuple(3);
+    e.put(0, DatumFactory.createText("D")); //  4
+    e.put(1, DatumFactory.createText("B")); //  2
+    e.put(2, DatumFactory.createText("C")); // x3 = 24
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().intValue());
+
+    Tuple overflowBefore = partitioner.increment(s, BigInteger.valueOf(5), 2);
+    assertEquals("A", overflowBefore.getText(0));
+    assertEquals("B", overflowBefore.getText(1));
+    assertEquals("C", overflowBefore.getText(2));
+    Tuple overflowed = partitioner.increment(overflowBefore, 
BigInteger.valueOf(1), 2);
+    assertEquals("B", overflowed.getText(0));
+    assertEquals("A", overflowed.getText(1));
+    assertEquals("A", overflowed.getText(2));
+  }
+
+  @Test
+  public void testIncrementOfUnicode() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs);
+
+    Tuple tuple = s;
+    Tuple prevTuple = null;
+    for (int i = 0; i < 100; i++) {
+      tuple = partitioner.increment(tuple, BigInteger.valueOf(30000), 0);
+      if (prevTuple != null) {
+        assertTrue("prev=" + prevTuple + ", current=" + tuple, 
comp.compare(prevTuple, tuple) < 0);
+      }
+      prevTuple = tuple;
+    }
+  }
+
+  @Test
+  public void testIncrementOfUnicodeOneCharSinglePartition() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("다"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 1;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testIncrementOfUnicodeOneCharMultiPartition() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("ê½¥"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 8;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenBeginTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenEndTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    e.put(0, DatumFactory.createText("하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하하하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenBeginTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenEndTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    VTuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testIncrementOfInt8() {
+    Schema schema = new Schema()
+        .addColumn("l_orderkey", Type.INT8)
+        .addColumn("l_linenumber", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createInt8(10));
+    s.put(1, DatumFactory.createInt8(20));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createInt8(19));
+    e.put(1, DatumFactory.createInt8(39));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(200, partitioner.getTotalCardinality().longValue());
+
+    Tuple range2 = partitioner.increment(s, BigInteger.valueOf(100), 1);
+    assertEquals(15, range2.getInt4(0));
+    assertEquals(20, range2.getInt4(1));
+    Tuple range3 = partitioner.increment(range2, BigInteger.valueOf(99), 1);
+    assertEquals(19, range3.getInt4(0));
+    assertEquals(39, range3.getInt4(1));
+  }
+
+  @Test public void testIncrementOfInt8AndFinal() {
+    Schema schema = new Schema()
+        .addColumn("l_orderkey", Type.INT8)
+        .addColumn("l_linenumber", Type.INT8)
+        .addColumn("final", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(3);
+    s.put(0, DatumFactory.createInt8(1));
+    s.put(1, DatumFactory.createInt8(1));
+    s.put(2, DatumFactory.createInt8(1));
+    VTuple e = new VTuple(3);
+    e.put(0, DatumFactory.createInt8(4)); // 4
+    e.put(1, DatumFactory.createInt8(2)); // 2
+    e.put(2, DatumFactory.createInt8(3)); //x3 = 24
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().longValue());
+
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
+    assertEquals(1, beforeOverflow.getInt8(0));
+    assertEquals(2, beforeOverflow.getInt8(1));
+    assertEquals(3, beforeOverflow.getInt8(2));
+    Tuple overflow = partitioner.increment(beforeOverflow, 
BigInteger.valueOf(1), 2);
+    assertEquals(2, overflow.getInt8(0));
+    assertEquals(1, overflow.getInt8(1));
+    assertEquals(1, overflow.getInt8(2));
+  }
+
+  @Test
+  public void testIncrementOfFloat8() {
+    Schema schema = new Schema()
+        .addColumn("l_orderkey", Type.FLOAT8)
+        .addColumn("l_linenumber", Type.FLOAT8)
+        .addColumn("final", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(3);
+    s.put(0, DatumFactory.createFloat8(1.1d));
+    s.put(1, DatumFactory.createFloat8(1.1d));
+    s.put(2, DatumFactory.createFloat8(1.1d));
+    VTuple e = new VTuple(3);
+    e.put(0, DatumFactory.createFloat8(4.1d)); // 4
+    e.put(1, DatumFactory.createFloat8(2.1d)); // 2
+    e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().longValue());
+
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
+    assertTrue(1.1d == beforeOverflow.getFloat8(0));
+    assertTrue(2.1d == beforeOverflow.getFloat8(1));
+    assertTrue(3.1d == beforeOverflow.getFloat8(2));
+    Tuple overflow = partitioner.increment(beforeOverflow, 
BigInteger.valueOf(1), 2);
+    assertTrue(2.1d == overflow.getFloat8(0));
+    assertTrue(1.1d == overflow.getFloat8(1));
+    assertTrue(1.1d == overflow.getFloat8(2));
+  }
+
+  @Test
+  public void testIncrementOfInet4() {
+    Schema schema = new Schema()
+        .addColumn("l_orderkey", Type.INET4)
+        .addColumn("l_linenumber", Type.INET4)
+        .addColumn("final", Type.INET4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(3);
+    s.put(0, DatumFactory.createInet4("127.0.1.1"));
+    s.put(1, DatumFactory.createInet4("127.0.0.1"));
+    s.put(2, DatumFactory.createInet4("128.0.0.253"));
+    VTuple e = new VTuple(3);
+    e.put(0, DatumFactory.createInet4("127.0.1.4")); // 4
+    e.put(1, DatumFactory.createInet4("127.0.0.2")); // 2
+    e.put(2, DatumFactory.createInet4("128.0.0.255")); //x3 = 24
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, 
sortSpecs);
+    assertEquals(24, partitioner.getTotalCardinality().longValue());
+
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
+    assertTrue("127.0.1.1".equals(beforeOverflow.getText(0)));
+    assertTrue("127.0.0.2".equals(beforeOverflow.getText(1)));
+    assertTrue("128.0.0.255".equals(beforeOverflow.getText(2)));
+    Tuple overflow = partitioner.increment(beforeOverflow, 
BigInteger.valueOf(1), 2);
+    assertTrue("127.0.1.2".equals(overflow.getText(0)));
+    assertTrue("127.0.0.1".equals(overflow.getText(1)));
+    assertTrue("128.0.0.253".equals(overflow.getText(2)));
+  }
+
+  @Test
+  public void testPartition() {
+    Schema schema = new Schema();
+    schema.addColumn("l_returnflag", Type.TEXT);
+    schema.addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("F"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("R"));
+    e.put(1, DatumFactory.createText("O"));
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner
+        = new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(31);
+
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+  }
+
+  @Test
+  public void testPartitionForOnePartNum() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createText("A"));
+    s.put(1, DatumFactory.createText("F"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("R"));
+    e.put(1, DatumFactory.createText("O"));
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(1);
+
+    assertEquals(expected, ranges[0]);
+  }
+
+  @Test
+  public void testPartitionForOnePartNumWithOneOfTheValueNull() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createNullDatum());
+    s.put(1, DatumFactory.createText("F"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createText("R"));
+    e.put(1, DatumFactory.createNullDatum());
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(1);
+
+    assertEquals(expected, ranges[0]);
+  }
+
+  @Test
+  public void testPartitionForMultipleChars() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("AAA"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("ZZZ"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(48);
+
+    TupleRange prev = null;
+    for (int i = 0; i < ranges.length; i++) {
+      if (prev != null) {
+        assertTrue(i + "th, prev=" + prev + ",cur=" + ranges[i], 
prev.compareTo(ranges[i]) < 0);
+      }
+      prev = ranges[i];
+    }
+    assertEquals(48, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[47].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleChars2() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("A1"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("A999975"));
+
+    final int partNum = 2;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleChars2Desc() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("A999975"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("A1"));
+
+    final int partNum = 48;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleCharsWithSameFirstChar() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("AAA"));
+    VTuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("AAZ"));
+
+    final int partNum = 4;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForOnePartNumWithBothValueNull() {
+    Schema schema = new Schema()
+        .addColumn("l_returnflag", Type.TEXT)
+        .addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createNullDatum());
+    s.put(1, DatumFactory.createNullDatum());
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createNullDatum());
+    e.put(1, DatumFactory.createNullDatum());
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(1);
+
+    assertEquals(expected, ranges[0]);
+  }
+
+  @Test
+  public void testPartitionWithNull() {
+    Schema schema = new Schema();
+    schema.addColumn("l_returnflag", Type.TEXT);
+    schema.addColumn("l_linestatus", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createNullDatum());
+    s.put(1, DatumFactory.createText("F"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createNullDatum());
+    e.put(1, DatumFactory.createText("O"));
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner
+        = new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(10);
+
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+  }
+
+  @Test
+  public void testPartitionWithINET4() {
+    Schema schema = new Schema();
+    schema.addColumn("l_returnflag", Type.INET4);
+    schema.addColumn("l_linestatus", Type.INET4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    VTuple s = new VTuple(2);
+    s.put(0, DatumFactory.createInet4("127.0.1.10"));
+    s.put(1, DatumFactory.createInet4("127.0.2.10"));
+    VTuple e = new VTuple(2);
+    e.put(0, DatumFactory.createInet4("127.0.1.20"));
+    e.put(1, DatumFactory.createInet4("127.0.2.20"));
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner
+        = new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(10);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
new file mode 100644
index 0000000..817d27a
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.junit.Test;
+
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMasterPlan {
+
+  @Test
+  public void testConnect() {
+    MasterPlan masterPlan = new 
MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+
+    ExecutionBlock eb1 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb2 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb3 = masterPlan.newExecutionBlock();
+
+    masterPlan.addConnect(eb1, eb2, ShuffleType.RANGE_SHUFFLE);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+
+    masterPlan.addConnect(eb3, eb2, ShuffleType.RANGE_SHUFFLE);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId()));
+
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb3.getId()));
+
+    masterPlan.disconnect(eb3, eb2);
+    assertFalse(masterPlan.isConnected(eb3, eb2));
+    assertFalse(masterPlan.isReverseConnected(eb2, eb3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
new file mode 100644
index 0000000..71bad20
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBNLJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestBNLJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+
+  private static int OUTER_TUPLE_NUM = 1000;
+  private static int INNER_TUPLE_NUM = 1000;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("memid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, schema, employeePath);
+    appender.init();
+    VTuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", schema, 
employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, 
peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, 
peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  // employee (managerId, empId, memId, deptName)
+  // people (empId, fk_memId, name, age)
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people 
p",
+      "select managerId, e.empId, deptName, e.memId from employee as e " +
+          "inner join people as p on e.empId = p.empId and e.memId = 
p.fk_memId" };
+
+  @Test
+  public final void testBNLCrossJoin() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
+        expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testBNLCrossJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
+
+    int i = 0;
+    exec.init();
+    while (exec.next() != null) {
+      i++;
+    }
+    exec.close();
+    assertEquals(OUTER_TUPLE_NUM * INNER_TUPLE_NUM / 2, i); // expected 10 * 5
+  }
+
+  @Test
+  public final void testBNLInnerJoin() throws IOException, TajoException {
+    Expr context = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
+        context).getRootBlock().getRoot();
+
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testBNLInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(),
+        merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof BNLJoinExec);
+
+    Tuple tuple;
+    int i = 1;
+    int count = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt4(0));
+      assertTrue(i == tuple.getInt4(1));
+      assertTrue(("dept_" + i).equals(tuple.getText(2)));
+      assertTrue(10 + i == tuple.getInt4(3));
+      i += 2;
+    }
+    exec.close();
+    assertEquals(INNER_TUPLE_NUM / 2, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
new file mode 100644
index 0000000..4ae880f
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestExternalSortExec {
+  private TajoConf conf;
+  private TajoTestingCluster util;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestExternalSortExec";
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+
+  private final int numTuple = 100000;
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TableDesc employee;
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, 
DEFAULT_TABLESPACE_NAME);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+
+    Schema schema = new Schema();
+    schema.addColumn("managerid", Type.INT4);
+    schema.addColumn("empid", Type.INT4);
+    schema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, schema, employeePath);
+    appender.enableStats();
+    appender.init();
+    VTuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < numTuple; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(50)),
+          DatumFactory.createInt4(rnd.nextInt(100)),
+          DatumFactory.createText("dept_" + i),
+      });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    employee = new TableDesc("default.employee", schema, employeeMeta, 
employeePath.toUri());
+    catalog.createTable(employee);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CommonTestingUtil.cleanupTestDir(TEST_PATH);
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, empId from employee order by managerId, empId"
+  };
+
+  @Test
+  public final void testNext() throws IOException, TajoException {
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { 
frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    ExternalSortExec extSort;
+    if (!(proj.getChild() instanceof ExternalSortExec)) {
+      UnaryPhysicalExec sortExec = proj.getChild();
+      SeqScanExec scan = sortExec.getChild();
+
+      extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), 
scan);
+      proj.setChild(extSort);
+    } else {
+      extSort = proj.getChild();
+    }
+    extSort.setSortBufferBytesNum(1024*1024);
+
+    Tuple tuple;
+    Tuple preVal = null;
+    Tuple curVal;
+    int cnt = 0;
+    exec.init();
+    long start = System.currentTimeMillis();
+    BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(),
+        new SortSpec[]{
+            new SortSpec(new Column("managerid", Type.INT4)),
+            new SortSpec(new Column("empid", Type.INT4))
+        });
+
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, 
comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = new VTuple(curVal);
+      cnt++;
+    }
+    long end = System.currentTimeMillis();
+    assertEquals(numTuple, cnt);
+
+    // for rescan test
+    preVal = null;
+    exec.rescan();
+    cnt = 0;
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, 
comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    assertEquals(numTuple, cnt);
+    exec.close();
+    System.out.println("Sort Time: " + (end - start) + " msc");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
new file mode 100644
index 0000000..5901c49
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+  private QueryContext defaultContext;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    VTuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    VTuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    VTuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x= 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
+    appender5.init();
+
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, 
phone3Path);
+    catalog.createTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join 
emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join 
emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join 
job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join 
phone3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testFullOuterHashJoinExec0() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, 
dep3.getMeta(), new Path(dep3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(), new Path(emp3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec1() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, 
job3.getMeta(), new Path(job3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(), new Path(emp3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+  @Test
+  public final void testFullOuterHashJoinExec2() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(), new Path(emp3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, 
job3.getMeta(), new Path(job3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuterHashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+
+  }
+
+
+  @Test
+  public final void testFullOuterHashJoinExec3() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(), new Path(emp3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, 
phone3.getMeta(), new Path(phone3.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuterHashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged,
+        workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+}

Reply via email to