http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
new file mode 100644
index 0000000..ec2aa04
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestFullOuterMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestFullOuterMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+  private QueryContext defaultContext;
+
+  private TableDesc dep3;
+  private TableDesc dep4;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String DEP4_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4");
+  private final String JOB3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, 
DEFAULT_TABLESPACE_NAME);
+
+    conf = util.getConfiguration();
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    VTuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+
+    //----------------- dep4 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    // 10     | dep_10    | 1010
+    Schema dep4Schema = new Schema();
+    dep4Schema.addColumn("dep_id", Type.INT4);
+    dep4Schema.addColumn("dep_name", Type.TEXT);
+    dep4Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep4Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep4Path = new Path(testDir, "dep4.csv");
+    Appender appender4 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path);
+    appender4.init();
+    VTuple tuple4 = new VTuple(dep4Schema.size());
+    for (int i = 0; i < 11; i++) {
+      tuple4.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createInt4(1000 + i) });
+      appender4.addTuple(tuple4);
+    }
+
+    appender4.flush();
+    appender4.close();
+    dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path);
+    catalog.createTable(dep4);
+
+
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    VTuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+          DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    VTuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
+    appender5.init();
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, 
phone3Path);
+    catalog.createTable(phone3);
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join 
dep3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the left operand
+      "select job3.job_id, job_title, emp_id, salary from emp3 full outer join 
job3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the right side
+      "select job3.job_id, job_title, emp_id, salary from job3 full outer join 
emp3 on job3.job_id=emp3.job_id",
+      // [3] no nulls, right continues after left
+      "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join 
dep4 on dep4.dep_id = emp3.dep_id",
+      // [4] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 full outer join 
phone3 on emp3.emp_id = phone3.emp_id",
+      // [5] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from phone3 full outer 
join emp3 on emp3.emp_id = phone3.emp_id",
+  };
+
+  @Test
+  public final void testFullOuterMergeJoin0() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] dep3Frags =
+        FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new 
Path(dep3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin0");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin1() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new 
Path(job3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin1");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin2() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] job3Frags =
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new 
Path(job3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin2");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(8, count);
+  }
+
+  @Test
+  public final void testFullOuterMergeJoin3() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] dep4Frags =
+        FileTablespace.splitNG(conf, DEP4_NAME, dep4.getMeta(), new 
Path(dep4.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin3");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // if it chose the hash join WITH REVERSED ORDER, convert to merge join 
exec
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(13, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin4() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new 
Path(phone3.getUri()),
+            Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin4");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+  @Test
+  public final void testFullOuterMergeJoin5() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[5]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] emp3Frags =
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new 
Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags =
+        FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new 
Path(phone3.getUri()),
+            Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testFullOuterMergeJoin5");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    assertNull(exec.next());
+    exec.close();
+    assertEquals(7, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
new file mode 100644
index 0000000..b9ba2de
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashAntiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private Path testDir;
+  private QueryContext queryContext;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, employeeSchema, employeePath);
+    appender.init();
+    VTuple tuple = new VTuple(employeeSchema.size());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, 
employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    queryContext = new QueryContext(conf);
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, 
peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    optimizer = new LogicalOptimizer(conf, catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people 
as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashAntiJoin() throws IOException, TajoException {
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testHashAntiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) 
join.getRightChild();
+      SeqScanExec scanLeftChild = sortLeftChild.getChild();
+      SeqScanExec scanRightChild = sortRightChild.getChild();
+
+      // 'people' should be outer table. So, the below code guarantees that 
people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanRightChild, 
scanLeftChild);
+      } else {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanLeftChild, 
scanRightChild);
+      }
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that 
people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), 
join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), 
join.getLeftChild(), join.getRightChild());
+      }
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt4(0));
+      assertTrue(i == tuple.getInt4(1)); // expected empid [0, 2, 4, 6, 8]
+      assertTrue(("dept_" + i).equals(tuple.getText(2)));
+      assertTrue(10 + i == tuple.getInt4(3));
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8]
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
new file mode 100644
index 0000000..4550db9
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+  private QueryContext defaultContext;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, 
DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, employeeSchema, employeePath);
+    appender.init();
+    VTuple tuple = new VTuple(employeeSchema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, 
employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, 
peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner 
join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testHashInnerJoin() throws IOException, TajoException {
+
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testHashInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashJoinExec);
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt4(0));
+      assertTrue(i == tuple.getInt4(1));
+      assertTrue(("dept_" + i).equals(tuple.getText(2)));
+      assertTrue(10 + i == tuple.getInt4(3));
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10 / 2, count);
+  }
+
+  @Test
+  public final void testCheckIfInMemoryInnerJoinIsPossible() throws 
IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()), Integer.MAX_VALUE);
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testHashInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 
100l);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashJoinExec);
+    HashJoinExec joinExec = proj.getChild();
+
+    assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+  }
+
+  /**
+   * It checks inner-join related functions. It will return TRUE if left 
relations is smaller than right relations.
+   *
+   * The below unit tests will work according to which side is smaller. In 
this unit tests, we use two tables: p and e.
+   * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot 
expect that which side is smaller,
+   * we use some boolean variable <code>leftSmaller</code> to indicate which 
side is small.
+   */
+  private static boolean 
assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
+                                                       PhysicalPlannerImpl 
phyPlanner,
+                                                       JoinNode joinNode, 
BinaryPhysicalExec joinExec) throws
+      IOException {
+
+    String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
+    String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
+
+    boolean leftSmaller;
+    if (left[0].equals("default.p")) {
+      leftSmaller = true;
+    } else {
+      leftSmaller = false;
+    }
+
+    long leftSize = phyPlanner.estimateSizeRecursive(ctx, left);
+    long rightSize = phyPlanner.estimateSizeRecursive(ctx, right);
+
+    // The table p is 75 bytes, and the table e is 140 bytes.
+    if (leftSmaller) { // if left one is smaller
+      assertEquals(75, leftSize);
+      assertEquals(140, rightSize);
+    } else { // if right one is smaller
+      assertEquals(140, leftSize);
+      assertEquals(75, rightSize);
+    }
+
+    if (leftSmaller) {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, 
joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[0], joinExec.getLeftChild());
+      assertEquals(ordered[1], joinExec.getRightChild());
+
+      assertEquals("default.p", left[0]);
+      assertEquals("default.e", right[0]);
+    } else {
+      PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, 
joinNode, joinExec.getLeftChild(),
+          joinExec.getRightChild());
+      assertEquals(ordered[1], joinExec.getLeftChild());
+      assertEquals(ordered[0], joinExec.getRightChild());
+
+      assertEquals("default.e", left[0]);
+      assertEquals("default.p", right[0]);
+    }
+
+    if (leftSmaller) {
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, 
joinNode.getLeftChild(), true));
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, 
joinNode.getRightChild(), false));
+    } else {
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, 
joinNode.getLeftChild(), true));
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, 
joinNode.getRightChild(), false));
+    }
+
+    return leftSmaller;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
new file mode 100644
index 0000000..1146e85
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashPartitioner {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testGetPartition() {   
+    VTuple tuple1 = new VTuple(3);
+    tuple1.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    VTuple tuple2 = new VTuple(3);
+    tuple2.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    VTuple tuple3 = new VTuple(3);
+    tuple3.put(new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(5)
+    });
+    VTuple tuple4 = new VTuple(3);
+    tuple4.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(3)
+    });
+    VTuple tuple5 = new VTuple(3);
+    tuple5.put(new Datum[] {
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(2),
+        DatumFactory.createInt4(4)
+    });
+    
+    int [] partKeys = {0,1};
+    Partitioner p = new HashPartitioner(partKeys, 2);
+    
+    int part1 = p.getPartition(tuple1);
+    assertEquals(part1, p.getPartition(tuple2));
+    assertEquals(part1, p.getPartition(tuple3));
+    
+    int part2 = p.getPartition(tuple4);
+    assertEquals(part2, p.getPartition(tuple5));    
+  }
+
+  @Test
+  public final void testGetPartition2() {
+    // https://issues.apache.org/jira/browse/TAJO-976
+    Random rand = new Random();
+    String[][] data = new String[1000][];
+
+    for (int i = 0; i < 1000; i++) {
+      data[i] = new String[]{ String.valueOf(rand.nextInt(1000)), 
String.valueOf(rand.nextInt(1000)), String.valueOf(rand.nextInt(1000))};
+    }
+
+    int[] testNumPartitions = new int[]{31, 62, 124, 32, 63, 125};
+    for (int index = 0; index <  testNumPartitions.length; index++) {
+      Partitioner p = new HashPartitioner(new int[]{0, 1, 2}, 
testNumPartitions[index]);
+
+      Set<Integer> ids = new TreeSet<Integer>();
+      for (int i = 0; i < data.length; i++) {
+        Tuple tuple = new VTuple(
+            new Datum[]{new TextDatum(data[i][0]), new TextDatum(data[i][1]), 
new TextDatum(data[i][2])});
+
+        ids.add(p.getPartition(tuple));
+      }
+
+      // The number of partitions isn't exactly matched.
+      assertTrue(ids.size() + 5 >= testNumPartitions[index]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
new file mode 100644
index 0000000..0d996de
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashSemiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private Path testDir;
+  private QueryContext queryContext;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, employeeSchema, employeePath);
+    appender.init();
+    VTuple tuple = new VTuple(employeeSchema.size());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, 
employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    // make 27 tuples
+    for (int i = 1; i < 10; i += 2) {
+      // make three duplicated tuples for each tuples
+      for (int j = 0; j < 3; j++) {
+        tuple.put(new Datum[] {
+            DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+            DatumFactory.createInt4(10 + i),
+            DatumFactory.createText("name_" + i),
+            DatumFactory.createInt4(30 + i) });
+        appender.addTuple(tuple);
+      }
+    }
+
+    appender.flush();
+    appender.close();
+
+    queryContext = new QueryContext(conf);
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, 
peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    optimizer = new LogicalOptimizer(conf, catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people 
as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashSemiJoin() throws IOException, TajoException {
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(),
+        new Path(employee.getUri()), Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(),
+        new Path(people.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testHashSemiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) 
join.getRightChild();
+      SeqScanExec scanLeftChild = sortLeftChild.getChild();
+      SeqScanExec scanRightChild = sortRightChild.getChild();
+
+      // 'people' should be outer table. So, the below code guarantees that 
people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanRightChild, 
scanLeftChild);
+      } else {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanLeftChild, 
scanRightChild);
+      }
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that 
people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("default.people")) {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), 
join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), 
join.getLeftChild(), join.getRightChild());
+      }
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    // expect result without duplicated tuples.
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt4(0));
+      assertTrue(i == tuple.getInt4(1));
+      assertTrue(("dept_" + i).equals(tuple.getText(2)));
+      assertTrue(10 + i == tuple.getInt4(3));
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9]
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
new file mode 100644
index 0000000..76e38e4
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterHashJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestLeftOuterHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private Path testDir;
+  private QueryContext defaultContext;
+
+  private TableDesc dep3;
+  private TableDesc job3;
+  private TableDesc emp3;
+  private TableDesc phone3;
+
+  private final String DEP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+  private final String JOB3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+  private final String EMP3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+  private final String PHONE3_NAME = 
CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    //----------------- dep3 ------------------------------
+    // dep_id | dep_name  | loc_id
+    //--------------------------------
+    //  0     | dep_0     | 1000
+    //  1     | dep_1     | 1001
+    //  2     | dep_2     | 1002
+    //  3     | dep_3     | 1003
+    //  4     | dep_4     | 1004
+    //  5     | dep_5     | 1005
+    //  6     | dep_6     | 1006
+    //  7     | dep_7     | 1007
+    //  8     | dep_8     | 1008
+    //  9     | dep_9     | 1009
+    Schema dep3Schema = new Schema();
+    dep3Schema.addColumn("dep_id", Type.INT4);
+    dep3Schema.addColumn("dep_name", Type.TEXT);
+    dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+    TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path dep3Path = new Path(testDir, "dep3.csv");
+    Appender appender1 = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    appender1.init();
+    VTuple tuple = new VTuple(dep3Schema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+                    DatumFactory.createText("dept_" + i),
+                    DatumFactory.createInt4(1000 + i) });
+      appender1.addTuple(tuple);
+    }
+
+    appender1.flush();
+    appender1.close();
+    dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+    catalog.createTable(dep3);
+
+    //----------------- job3 ------------------------------
+    //  job_id  | job_title
+    // ----------------------
+    //   101    |  job_101
+    //   102    |  job_102
+    //   103    |  job_103
+
+    Schema job3Schema = new Schema();
+    job3Schema.addColumn("job_id", Type.INT4);
+    job3Schema.addColumn("job_title", Type.TEXT);
+
+
+    TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path job3Path = new Path(testDir, "job3.csv");
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(job3Meta, job3Schema, job3Path);
+    appender2.init();
+    VTuple tuple2 = new VTuple(job3Schema.size());
+    for (int i = 1; i < 4; i++) {
+      int x = 100 + i;
+      tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+                    DatumFactory.createText("job_" + x) });
+      appender2.addTuple(tuple2);
+    }
+
+    appender2.flush();
+    appender2.close();
+    job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+    catalog.createTable(job3);
+
+
+
+    //---------------------emp3 --------------------
+    // emp_id  | first_name | last_name | dep_id | salary | job_id
+    // ------------------------------------------------------------
+    //  11     |  fn_11     |  ln_11    |  1     | 123    | 101
+    //  13     |  fn_13     |  ln_13    |  3     | 369    | 103
+    //  15     |  fn_15     |  ln_15    |  5     | 615    | null
+    //  17     |  fn_17     |  ln_17    |  7     | 861    | null
+    //  19     |  fn_19     |  ln_19    |  9     | 1107   | null
+    //  21     |  fn_21     |  ln_21    |  1     | 123    | 101
+    //  23     |  fn_23     |  ln_23    |  3     | 369    | 103
+
+    Schema emp3Schema = new Schema();
+    emp3Schema.addColumn("emp_id", Type.INT4);
+    emp3Schema.addColumn("first_name", Type.TEXT);
+    emp3Schema.addColumn("last_name", Type.TEXT);
+    emp3Schema.addColumn("dep_id", Type.INT4);
+    emp3Schema.addColumn("salary", Type.FLOAT4);
+    emp3Schema.addColumn("job_id", Type.INT4);
+
+
+    TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path emp3Path = new Path(testDir, "emp3.csv");
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
+    appender3.init();
+    VTuple tuple3 = new VTuple(emp3Schema.size());
+
+    for (int i = 1; i < 4; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+
+      int y = 20 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+          DatumFactory.createText("firstname_" + y),
+          DatumFactory.createText("lastname_" + y),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createInt4(100 + i) });
+      appender3.addTuple(tuple3);
+    }
+
+    for (int i = 5; i < 10; i += 2) {
+      int x = 10 + i;
+      tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("firstname_" + x),
+          DatumFactory.createText("lastname_" + x),
+          DatumFactory.createInt4(i),
+          DatumFactory.createFloat4(123 * i),
+          DatumFactory.createNullDatum() });
+      appender3.addTuple(tuple3);
+    }
+
+    appender3.flush();
+    appender3.close();
+    emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+    catalog.createTable(emp3);
+
+    //---------------------phone3 --------------------
+    // emp_id  | phone_number
+    // -----------------------------------------------
+    // this table is empty, no rows
+
+    Schema phone3Schema = new Schema();
+    phone3Schema.addColumn("emp_id", Type.INT4);
+    phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+    TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT");
+    Path phone3Path = new Path(testDir, "phone3.csv");
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
+    appender5.init();
+    
+    appender5.flush();
+    appender5.close();
+    phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, 
phone3Path);
+    catalog.createTable(phone3);
+
+
+
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      // [0] no nulls
+      "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join 
emp3 on dep3.dep_id = emp3.dep_id",
+      // [1] nulls on the right operand
+      "select job3.job_id, job_title, emp_id, salary from job3 left outer join 
emp3 on job3.job_id=emp3.job_id",
+      // [2] nulls on the left side
+      "select job3.job_id, job_title, emp_id, salary from emp3 left outer join 
job3 on job3.job_id=emp3.job_id",
+      // [3] one operand is empty
+      "select emp3.emp_id, first_name, phone_number from emp3 left outer join 
phone3 on emp3.emp_id = phone3.emp_id",
+      // [4] one operand is empty
+      "select phone_number, emp3.emp_id, first_name from phone3 left outer 
join emp3 on emp3.emp_id = phone3.emp_id"
+  };
+
+  @Test
+  public final void testLeftOuterHashJoinExec0() throws IOException, 
TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+    JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), 
JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, 
dep3.getMeta(),
+        new Path(dep3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(),
+        new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestLeftOuterHashJoinExec0");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec);
+
+    int count = 0;
+    exec.init();
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(12, count);
+  }
+
+
+  @Test
+  public final void testLeftOuter_HashJoinExec1() throws IOException, 
TajoException {
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, 
job3.getMeta(),
+        new Path(job3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(),
+        new Path(emp3.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestLeftOuter_HashJoinExec1");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[1]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(5, count);
+  }
+
+    @Test
+  public final void testLeftOuter_HashJoinExec2() throws IOException, 
TajoException {
+    
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(),
+        new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, 
job3.getMeta(),
+        new Path(job3.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
+        "/TestLeftOuter_HashJoinExec2");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[2]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+
+   @Test
+  public final void testLeftOuter_HashJoinExec3() throws IOException, 
TajoException {
+    
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, 
emp3.getMeta(),
+        new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, 
phone3.getMeta(),
+        new Path(phone3.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestLeftOuter_HashJoinExec3");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[3]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+
+    while ((tuple = exec.next()) != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(7, count);
+  }
+
+  
+   @Test
+  public final void testLeftOuter_HashJoinExec4() throws IOException, 
TajoException {
+    
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, "default.emp3", 
emp3.getMeta(),
+        new Path(emp3.getUri()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, 
"default.phone3", phone3.getMeta(),
+        new Path(phone3.getUri()), Integer.MAX_VALUE);
+
+    FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestLeftOuter_HashJoinExec4");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[4]);
+    LogicalNode plan = planner.createPlan(defaultContext, 
expr).getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+    int count = 0;
+    exec.init();
+
+    while (exec.next() != null) {
+      //TODO check contents
+      count = count + 1;
+    }
+    exec.close();
+    assertEquals(0, count);
+  }
+  
+
+
+}
+ //--camelia

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
new file mode 100644
index 0000000..2c26cc2
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMergeJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/TestMergeJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, 
testDir.toUri().toString());
+    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, 
DEFAULT_TABLESPACE_NAME);
+    conf = util.getConfiguration();
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerid", Type.INT4);
+    employeeSchema.addColumn("empid", Type.INT4);
+    employeeSchema.addColumn("memid", Type.INT4);
+    employeeSchema.addColumn("deptname", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT");
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(employeeMeta, employeeSchema, employeePath);
+    appender.init();
+    VTuple tuple = new VTuple(employeeSchema.size());
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 11; i < 20; i+=2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, 
employeeMeta, employeePath);
+    catalog.createTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empid", Type.INT4);
+    peopleSchema.addColumn("fk_memid", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT");
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleSchema.size());
+    for (int i = 1; i < 10; i += 2) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+    for (int i = 10; i < 20; i++) {
+      tuple.put(new Datum[] { DatumFactory.createInt4(i),
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("name_" + i),
+          DatumFactory.createInt4(30 + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("default.people", peopleSchema, 
peopleMeta, peoplePath);
+    catalog.createTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e inner 
join " +
+          "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+  };
+
+  @Test
+  public final void testMergeInnerJoin() throws IOException, TajoException {
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = 
planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
+    LogicalNode root = plan.getRootBlock().getRoot();
+
+    JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
+    Enforcer enforcer = new Enforcer();
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+    FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", 
employee.getMeta(), new Path(employee.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", 
people.getMeta(), new Path(people.getUri()),
+        Integer.MAX_VALUE);
+    FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = 
CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
"/testMergeInnerJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
+    ctx.setEnforcer(enforcer);
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+    ProjectionExec proj = (ProjectionExec) exec;
+    assertTrue(proj.getChild() instanceof MergeJoinExec);
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt4(0));
+      assertTrue(i == tuple.getInt4(1));
+      assertTrue(("dept_" + i).equals(tuple.getText(2)));
+      assertTrue((10 + i) == tuple.getInt4(3));
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(10, count); // expected 10 * 5
+  }
+}

Reply via email to