http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
new file mode 100644
index 0000000..6eb2841
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -0,0 +1,1301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.exception.ReturnStateUtil;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.FetchResultSet;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.worker.TajoWorker;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.*;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static 
org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestTablePartitions extends QueryTestCaseBase {
+
+  private NodeType nodeType;
+
+  public TestTablePartitions(NodeType nodeType) throws IOException {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+    this.nodeType = nodeType;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+      //type
+      {NodeType.INSERT},
+      {NodeType.CREATE_TABLE},
+    });
+  }
+
+  @Test
+  public final void testCreateColumnPartitionedTable() throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4) partition by 
column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getSchema().size());
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getLogicalSchema().size());
+
+      res = testBase.execute(
+        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, 
" +
+          "l_quantity from lineitem");
+    } else {
+      res = testBase.execute(
+        "create table " + tableName + "(col1 int4, col2 int4) partition by 
column(key float8) "
+          + " as select l_orderkey, l_partkey, l_quantity from lineitem");
+    }
+
+    MasterPlan plan = getQueryPlan(res);
+    ExecutionBlock rootEB = plan.getRoot();
+
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
+    assertNotNull(insertEB);
+
+    assertEquals(nodeType, insertEB.getPlan().getType());
+    assertEquals(1, plan.getChildCount(insertEB.getId()));
+
+    ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
+
+    List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
+    assertEquals(1, list.size());
+    DataChannel channel = list.get(0);
+    assertNotNull(channel);
+    assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
+    assertEquals(1, channel.getShuffleKeys().length);
+
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
+  }
+
+  @Test
+  public final void testCreateColumnPartitionedTableWithJoin() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4) partition by 
column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getSchema().size());
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getLogicalSchema().size());
+
+      res = testBase.execute(
+        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, 
" +
+          "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
+
+    } else {
+      res = testBase.execute("create table " + tableName + " (col1 int4, col2 
int4) partition by column(key float8) "
+        + " AS select l_orderkey, l_partkey, l_quantity from lineitem join 
orders on l_orderkey = o_orderkey");
+    }
+
+    MasterPlan plan = getQueryPlan(res);
+    ExecutionBlock rootEB = plan.getRoot();
+
+    assertEquals(1, plan.getChildCount(rootEB.getId()));
+
+    ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
+    assertNotNull(insertEB);
+    assertEquals(nodeType, insertEB.getPlan().getType());
+    assertEquals(1, plan.getChildCount(insertEB.getId()));
+
+    ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
+
+    List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId());
+    assertEquals(1, list.size());
+    DataChannel channel = list.get(0);
+    assertNotNull(channel);
+    assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
+    assertEquals(1, channel.getShuffleKeys().length);
+
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
+  }
+
+  @Test
+  public final void testCreateColumnPartitionedTableWithSelectedColumns() 
throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithSelectedColumns");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4, null_col int4) 
partition by column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getSchema().size());
+      assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName).getLogicalSchema().size());
+
+      res = executeString("insert overwrite into " + tableName + " (col1, 
col2, key) select l_orderkey, " +
+        "l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 
int4, null_col int4)"
+        + " partition by column(key float8) AS select l_orderkey, l_partkey, 
null, l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumn() throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumn");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4, null_col int4) 
partition by column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString("insert overwrite into " + tableName
+        + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 
int4, null_col int4) "
+        + " partition by column(key float8) as select l_orderkey, l_partkey, 
null, l_quantity from lineitem");
+    }
+    res.close();
+
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    assertPartitionDirectories(desc);
+
+    res = executeString(
+        "select distinct * from " + tableName + " where (key = 45.0 or key = 
38.0) and null_col is null");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 2; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1));
+      assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2));
+    }
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName,
+      new String[]{"key"}, desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
+  }
+
+  private void assertPartitionDirectories(TableDesc desc) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+  }
+
+  @Test
+  public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testQueryCasesOnColumnPartitionedTable");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4, null_col int4) 
partition by column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName
+          + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 
int4, null_col int4) "
+        + " partition by column(key float8) as select l_orderkey, l_partkey, 
null, l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    assertPartitionDirectories(desc);
+
+    res = executeFile("case1.sql");
+    assertResultSet(res, "case1.result");
+    res.close();
+
+    res = executeFile("case2.sql");
+    assertResultSet(res, "case2.result");
+    res.close();
+
+    res = executeFile("case3.sql");
+    assertResultSet(res, "case3.result");
+    res.close();
+
+    // select pow(key, 2) from testQueryCasesOnColumnPartitionedTable
+    res = executeFile("case4.sql");
+    assertResultSet(res, "case4.result");
+    res.close();
+
+    // select round(pow(key + 1, 2)) from 
testQueryCasesOnColumnPartitionedTable
+    res = executeFile("case5.sql");
+    assertResultSet(res, "case5.result");
+    res.close();
+
+    // select col1, key from testQueryCasesOnColumnPartitionedTable order by 
pow(key, 2) desc
+    res = executeFile("case6.sql");
+    assertResultSet(res, "case6.result");
+    res.close();
+
+    // select col1, key from testQueryCasesOnColumnPartitionedTable WHERE key 
BETWEEN 35 AND 48;
+    res = executeFile("case7.sql");
+    assertResultSet(res, "case7.result");
+    res.close();
+
+    // select col1, CASE key WHEN 36 THEN key WHEN 49 THEN key ELSE key END 
from testQueryCasesOnColumnPartitionedTable;
+    res = executeFile("case8.sql");
+    assertResultSet(res, "case8.result");
+    res.close();
+
+    // select col1, CAST(key AS INT) from 
testQueryCasesOnColumnPartitionedTable;
+    res = executeFile("case9.sql");
+    assertResultSet(res, "case9.result");
+    res.close();
+
+    // select col1, (!(key > 35)) from testQueryCasesOnColumnPartitionedTable;
+    res = executeFile("case10.sql");
+    assertResultSet(res, "case10.result");
+    res.close();
+
+    // alias partition column
+    res = executeFile("case11.sql");
+    assertResultSet(res, "case11.result");
+    res.close();
+
+    // alias partition column in group by, order by
+    res = executeFile("case12.sql");
+    assertResultSet(res, "case12.result");
+    res.close();
+
+    // alias partition column in stage
+    res = executeFile("case13.sql");
+    assertResultSet(res, "case13.result");
+    res.close();
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"key"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumns() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumns");
+
+    if (nodeType == NodeType.INSERT) {
+      res = testBase.execute(
+        "create table " + tableName + " (col4 text) partition by column(col1 
int4, col2 int4, col3 float8) ");
+      res.close();
+      TajoTestingCluster cluster = testBase.getTestingCluster();
+      CatalogService catalog = cluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString("insert overwrite into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString( "create table " + tableName + " (col4 text) "
+        + " partition by column(col1 int4, col2 int4, col3 float8) as select 
l_returnflag, l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    Path path = new Path(desc.getUri());
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=3/col3=49.0")));
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+
+    for (int i = 0; i < 2; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or 
col1 = 3) and col2 >= 2");
+
+    for (int i = 0; i < 3; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
+    }
+
+    res = executeString("SELECT col1, col2, col3 FROM " + tableName);
+    String result = resultSetToString(res);
+    res.close();
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1", "col2", "col3"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
+  }
+
+  @Test
+  public final void testInsertIntoColumnPartitionedTableByThreeColumns() 
throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
+
+    if (nodeType == NodeType.INSERT) {
+      res = testBase.execute(
+        "create table " + tableName + " (col4 text) partition by column(col1 
int4, col2 int4, col3 float8) ");
+      res.close();
+      TajoTestingCluster cluster = testBase.getTestingCluster();
+      CatalogService catalog = cluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString("insert into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString( "create table " + tableName + " (col4 text) "
+        + " partition by column(col1 int4, col2 int4, col3 float8) as select 
l_returnflag, l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1", "col2", "col3"},
+      desc.getStats().getNumRows());
+
+    Path path = new Path(desc.getUri());
+
+    FileSystem fs = FileSystem.get(conf);
+    verifyDirectoriesForThreeColumns(fs, path, 1);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 2; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or 
col1 = 3) and col2 >= 2");
+
+    for (int i = 0; i < 3; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
+    }
+    res.close();
+
+    // insert into already exists partitioned table
+    res = executeString("insert into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from 
lineitem");
+    res.close();
+
+    desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+
+    // TODO: When inserting into already exists partitioned table, table 
status need to change correctly.
+//    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, 
new String[]{"col1", "col2", "col3"},
+//      desc.getStats().getNumRows());
+
+    path = new Path(desc.getUri());
+
+    verifyDirectoriesForThreeColumns(fs, path, 2);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    String expected = "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "R\n" +
+        "R\n" +
+        "R\n" +
+        "R\n";
+
+    String tableData = getTableFileContents(new Path(desc.getUri()));
+    assertEquals(expected, tableData);
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+    String resultSetData = resultSetToString(res);
+    res.close();
+    expected = "col4,col1,col2,col3\n" +
+        "-------------------------------\n" +
+        "N,2,2,38.0\n" +
+        "N,2,2,38.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,2,45.0\n";
+    assertEquals(expected, resultSetData);
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or 
col1 = 3) and col2 >= 2");
+    resultSetData = resultSetToString(res);
+    res.close();
+    expected = "col4,col1,col2,col3\n" +
+        "-------------------------------\n" +
+        "N,2,2,38.0\n" +
+        "N,2,2,38.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,3,49.0\n" +
+        "R,3,3,49.0\n";
+    assertEquals(expected, resultSetData);
+
+    // Check not to remove existing partition directories.
+    res = executeString("insert overwrite into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity 
from lineitem "
+        + " where l_orderkey = 1 and l_partkey = 1 and  l_linenumber = 1");
+    res.close();
+
+    verifyDirectoriesForThreeColumns(fs, path, 3);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      // TODO: If there is existing another partition directory, we must add 
its rows number to result row numbers.
+      // desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+      // assertEquals(6, desc.getStats().getNumRows().intValue());
+    }
+
+    verifyKeptExistingData(res, tableName);
+
+    // insert overwrite empty result to partitioned table
+    res = executeString("insert overwrite into " + tableName
+      + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem 
where l_orderkey > 100");
+    res.close();
+
+    verifyDirectoriesForThreeColumns(fs, path, 4);
+    verifyKeptExistingData(res, tableName);
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  private final void verifyKeptExistingData(ResultSet res, String tableName) 
throws Exception {
+    res = executeString("select * from " + tableName + " where col2 = 1");
+    String resultSetData = resultSetToString(res);
+    res.close();
+    String expected = "col4,col1,col2,col3\n" +
+      "-------------------------------\n" +
+      "N,1,1,17.0\n" +
+      "N,1,1,17.0\n" +
+      "N,1,1,30.0\n" +
+      "N,1,1,36.0\n" +
+      "N,1,1,36.0\n";
+
+    assertEquals(expected, resultSetData);
+  }
+
+  private final void verifyDirectoriesForThreeColumns(FileSystem fs, Path 
path, int step) throws Exception {
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+
+    if (step == 1 || step == 2) {
+      assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=17.0")));
+    } else {
+      assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=17.0")));
+      assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=30.0")));
+    }
+
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=3/col3=49.0")));
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumnsWithCompression() 
throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col2 int4, col3 float8) USING text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "PARTITION BY column(col1 int4)");
+      res.close();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName + " select l_partkey, l_quantity, 
l_orderkey from lineitem");
+    } else {
+      res = executeString(
+        "create table " + tableName + " (col2 int4, col3 float8) USING text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "PARTITION BY column(col1 int4) as select l_partkey, l_quantity, 
l_orderkey from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(new Path(desc.getUri())));
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+
+    for (FileStatus partition : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition.getPath()));
+      for (FileStatus file : fs.listStatus(partition.getPath())) {
+        CompressionCodec codec = factory.getCodec(file.getPath());
+        assertTrue(codec instanceof DeflateCodec);
+      }
+    }
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByTwoColumnsWithCompression() 
throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByTwoColumnsWithCompression");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("create table " + tableName + " (col3 float8, col4 
text) USING text " +
+        "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+        "PARTITION by column(col1 int4, col2 int4)");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName +
+          " select  l_quantity, l_returnflag, l_orderkey, l_partkey from 
lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col3 float8, col4 
text) USING text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "PARTITION by column(col1 int4, col2 int4) as select  l_quantity, 
l_returnflag, l_orderkey, " +
+        "l_partkey from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(new Path(desc.getUri())));
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus file : fs.listStatus(partition2.getPath())) {
+          CompressionCodec codec = factory.getCodec(file.getPath());
+          assertTrue(codec instanceof DeflateCodec);
+        }
+      }
+    }
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1", "col2"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumnsWithCompression() 
throws Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumnsWithCompression");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col4 text) USING text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "partition by column(col1 int4, col2 int4, col3 float8)");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName +
+          " select l_returnflag, l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col4 text) USING 
text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "partition by column(col1 int4, col2 int4, col3 float8) as select 
l_returnflag, l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(new Path(desc.getUri())));
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    int i = 0;
+    while (res.next()) {
+      assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
+      i++;
+    }
+    res.close();
+    assertEquals(2, i);
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or 
col1 = 3) and col2 >= 2");
+    i = 0;
+    while(res.next()) {
+      assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
+      i++;
+    }
+
+    res.close();
+    assertEquals(3, i);
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1", "col2", "col3"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableNoMatchedPartition() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableNoMatchedPartition");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col4 text) USING text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "partition by column(col1 int4, col2 int4, col3 float8)");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName +
+          " select l_returnflag , l_orderkey, l_partkey, l_quantity from 
lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col4 text) USING 
text " +
+          "WITH 
('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
 " +
+          "partition by column(col1 int4, col2 int4, col3 float8) as select 
l_returnflag , l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.exists(new Path(desc.getUri())));
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    Path path = new Path(desc.getUri());
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + 
"/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 9");
+    assertFalse(res.next());
+    res.close();
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col1", "col2", "col3"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions1() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions1");
+    res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4, null_col int4) 
partition by column(key float8) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    ClientProtos.SubmitQueryResponse response = client.executeQuery("insert 
overwrite into " + tableName
+        + " select l_orderkey, l_partkey from lineitem");
+
+    assertTrue(ReturnStateUtil.isError(response.getState()));
+    assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
+
+    res = executeFile("case14.sql");
+    assertResultSet(res, "case14.result");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"key"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions2() throws 
Exception {
+    ResultSet res = null;
+    ClientProtos.SubmitQueryResponse response = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions2");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4, null_col int4) 
partition by column(key float8) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      response = client.executeQuery("insert overwrite into " + tableName
+        + " select l_returnflag , l_orderkey, l_partkey from lineitem");
+
+      assertTrue(ReturnStateUtil.isError(response.getState()));
+      assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
+
+      res = executeFile("case15.sql");
+      assertResultSet(res, "case15.result");
+      res.close();
+
+      TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+      verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, 
new String[]{"key"},
+        desc.getStats().getNumRows());
+
+      executeString("DROP TABLE " + tableName + " PURGE").close();
+    }
+  }
+
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions3() throws 
Exception {
+    ResultSet res = executeString("create database testinsertquery1;");
+    res.close();
+    res = executeString("create database testinsertquery2;");
+    res.close();
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("create table testinsertquery1.table1 " +
+        "(col1 int4, col2 int4, col3 float8)");
+      res.close();
+
+      res = executeString("create table testinsertquery2.table1 " +
+        "(col1 int4, col2 int4, col3 float8)");
+      res.close();
+
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable("testinsertquery1", "table1"));
+      assertTrue(catalog.existsTable("testinsertquery2", "table1"));
+
+      res = executeString("insert overwrite into testinsertquery1.table1 " +
+        "select l_orderkey, l_partkey, l_quantity from default.lineitem;");
+      res.close();
+    } else {
+      res = executeString("create table testinsertquery1.table1 " +
+        "(col1 int4, col2 int4, col3 float8) as select l_orderkey, l_partkey, 
l_quantity from default.lineitem;");
+      res.close();
+    }
+
+    TableDesc desc = catalog.getTableDesc("testinsertquery1", "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("insert overwrite into testinsertquery2.table1 " +
+        "select col1, col2, col3 from testinsertquery1.table1;");
+      res.close();
+    } else {
+      res = executeString("create table testinsertquery2.table1 " +
+        "(col1 int4, col2 int4, col3 float8) as select col1, col2, col3 from 
testinsertquery1.table1;");
+      res.close();
+    }
+    desc = catalog.getTableDesc("testinsertquery2", "table1");
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    executeString("DROP TABLE testinsertquery1.table1 PURGE").close();
+    executeString("DROP TABLE testinsertquery2.table1 PURGE").close();
+    executeString("DROP DATABASE testinsertquery1").close();
+    executeString("DROP DATABASE testinsertquery2").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions5() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 
text) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString("insert overwrite into " + tableName + "(col1) 
select l_returnflag from lineitem");
+
+    } else {
+      res = executeString("create table " + tableName + " (col1 text) 
partition by column(col2 text) " +
+        " as select l_returnflag, null from lineitem");
+    }
+    res.close();
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col2"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions6() throws 
Exception {
+    ResultSet res = null;
+    String tableName = 
CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 
text) ");
+      res.close();
+
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+      res = executeString(
+        "insert overwrite into " + tableName + "(col1) select l_returnflag 
from lineitem where l_orderkey = 1");
+    } else {
+      res = executeString( "create table " + tableName + " (col1 text) 
partition by column(col2 text) " +
+        " as select l_returnflag, null from lineitem where l_orderkey = 1");
+    }
+    res.close();
+
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new 
String[]{"col2"},
+      desc.getStats().getNumRows());
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
+
+  private MasterPlan getQueryPlan(ResultSet res) {
+    QueryId queryId;
+    if (res instanceof TajoMemoryResultSet) {
+      queryId = ((TajoMemoryResultSet) res).getQueryId();
+    } else {
+      queryId = ((FetchResultSet) res).getQueryId();
+    }
+
+    for (TajoWorker eachWorker : testingCluster.getTajoWorkers()) {
+      QueryMasterTask queryMasterTask = 
eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, 
true);
+      if (queryMasterTask != null) {
+        return queryMasterTask.getQuery().getPlan();
+      }
+    }
+
+    fail("Can't find query from workers" + queryId);
+    return null;
+  }
+
+  @Test
+  public void testScatteredHashShuffle() throws Exception {
+    
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
 "2");
+    
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
 "1");
+    try {
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.TEXT_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("col1", TajoDataTypes.Type.TEXT);
+      schema.addColumn("col2", TajoDataTypes.Type.TEXT);
+
+      List<String> data = new ArrayList<String>();
+      int totalBytes = 0;
+      Random rand = new Random(System.currentTimeMillis());
+      String col2Data = 
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2"
 +
+          
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2"
 +
+          
"Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2";
+
+      int index = 0;
+      while(true) {
+        int col1RandomValue = 1;
+        String str = col1RandomValue + "|col2-" + index + "-" + col2Data;
+        data.add(str);
+
+        totalBytes += str.getBytes().length;
+
+        if (totalBytes > 4 * 1024 * 1024) {
+          break;
+        }
+        index++;
+      }
+
+      TajoTestingCluster.createTable("testscatteredhashshuffle", schema, 
tableOptions, data.toArray(new String[]{}), 3);
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable("default", "testscatteredhashshuffle"));
+
+      if (nodeType == NodeType.INSERT) {
+        executeString("create table test_partition (col2 text) partition by 
column (col1 text)").close();
+        executeString("insert into test_partition select col2, col1 from 
testscatteredhashshuffle").close();
+      } else {
+        executeString("create table test_partition (col2 text) PARTITION BY 
COLUMN (col1 text) AS select col2, " +
+          "col1 from testscatteredhashshuffle").close();
+      }
+
+      ResultSet res = executeString("select col1 from test_partition");
+
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(data.size(), numRows);
+
+      TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, 
"test_partition");
+      verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, 
"test_partition", new String[]{"col1"},
+        desc.getStats().getNumRows());
+
+    } finally {
+      
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
+          TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
+      
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname,
+          TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.defaultVal);
+      executeString("DROP TABLE test_partition PURGE").close();
+      executeString("DROP TABLE testScatteredHashShuffle PURGE").close();
+    }
+  }
+
+  @Test
+  public final void TestSpecialCharPartitionKeys1() throws Exception {
+    // See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due 
to special characters.
+
+    executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
+
+    if (nodeType == NodeType.INSERT) {
+      executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) 
PARTITION BY COLUMN (type text)")
+        .close();
+      executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, 
l_shipinstruct, l_shipmode FROM lineitemspecial")
+        .close();
+    } else {
+      executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) 
PARTITION BY COLUMN (type text)" +
+        " AS  SELECT l_orderkey, l_shipinstruct, l_shipmode FROM 
lineitemspecial")
+        .close();
+    }
+
+    ResultSet res = executeString("select * from pTable947 where 
type='RA:*?><I/L#%S' or type='AIR'");
+
+    String resStr = resultSetToString(res);
+    String expected =
+        "id,name,type\n" +
+            "-------------------------------\n"
+            + "3,NONE,AIR\n"
+            + "3,TEST SPECIAL CHARS,RA:*?><I/L#%S\n";
+
+    assertEquals(expected, resStr);
+    cleanupQuery(res);
+
+    executeString("DROP TABLE pTable947 PURGE").close();
+  }
+
+  @Test
+  public final void TestSpecialCharPartitionKeys2() throws Exception {
+    // See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due 
to special characters.
+
+    executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
+
+    if (nodeType == NodeType.INSERT) {
+      executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) 
PARTITION BY COLUMN (type text)")
+        .close();
+      executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, 
l_shipinstruct, l_shipmode FROM lineitemspecial")
+        .close();
+    } else {
+      executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) 
PARTITION BY COLUMN (type text)" +
+        " AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM 
lineitemspecial")
+        .close();
+    }
+
+    ResultSet res = executeString("select * from pTable948 where 
type='RA:*?><I/L#%S'");
+    assertResultSet(res);
+    cleanupQuery(res);
+
+    res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or 
type='AIR01'");
+    assertResultSet(res);
+    cleanupQuery(res);
+
+    executeString("DROP TABLE pTable948 PURGE").close();
+  }
+
+  @Test
+  public final void testIgnoreFilesInIntermediateDir() throws Exception {
+    // See - TAJO-1219: Files located in intermediate directories of 
partitioned table should be ignored
+    // It verifies that Tajo ignores files located in intermediate directories 
of partitioned table.
+
+    if (nodeType == NodeType.INSERT) {
+      Path testDir = CommonTestingUtil.getTestDir();
+
+      executeString(
+        "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) 
USING CSV PARTITION BY COLUMN (col2 text) " +
+          "LOCATION '" + testDir + "'");
+
+      FileSystem fs = testDir.getFileSystem(conf);
+      FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
+      fos.write("a|b|c".getBytes());
+      fos.close();
+
+      ResultSet res = executeString("select * from 
testIgnoreFilesInIntermediateDir;");
+      assertFalse(res.next());
+      res.close();
+    }
+  }
+
+  /**
+   * Verify added partitions to a table. This would check each partition's 
directory using record of table.
+   *
+   *
+   * @param databaseName
+   * @param tableName
+   * @param partitionColumns
+   * @param numRows
+   * @throws Exception
+   */
+  private void verifyPartitionDirectoryFromCatalog(String databaseName, String 
tableName,
+                                                   String[] partitionColumns, 
Long numRows) throws Exception {
+    int rowCount = 0;
+
+    // Get all partition column values
+    StringBuilder query = new StringBuilder();
+    query.append("SELECT");
+    for (int i = 0; i < partitionColumns.length; i++) {
+      String partitionColumn = partitionColumns[i];
+      if (i > 0) {
+        query.append(",");
+      }
+      query.append(" ").append(partitionColumn);
+    }
+    query.append(" FROM ").append(tableName);
+    ResultSet res = executeString(query.toString());
+
+    StringBuilder partitionName = new StringBuilder();
+    PartitionDescProto partitionDescProto = null;
+
+    // Check whether that partition's directory exist or doesn't exist.
+    while(res.next()) {
+      partitionName.delete(0, partitionName.length());
+
+      for (int i = 0; i < partitionColumns.length; i++) {
+        String partitionColumn = partitionColumns[i];
+        if (i > 0) {
+          partitionName.append("/");
+        }
+        
partitionName.append(partitionColumn).append("=").append(res.getString(partitionColumn));
+      }
+      partitionDescProto = catalog.getPartition(databaseName, tableName, 
partitionName.toString());
+      assertNotNull(partitionDescProto);
+      assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + 
partitionName.toString()) > 0);
+
+      rowCount++;
+    }
+
+    res.close();
+
+    // Check row count.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(numRows, new Long(rowCount));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
new file mode 100644
index 0000000..bc643ab
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestTableSubQuery extends QueryTestCaseBase {
+
+  public TestTableSubQuery() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @Test
+  public final void testTableSubquery1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testGroupBySubQuery() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testJoinSubQuery() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testJoinSubQuery2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testGroupbySubqueryWithJson() throws Exception {
+    ResultSet res = executeJsonQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testJoinSubqueryWithJson() throws Exception {
+    /*
+    SELECT
+      A.n_regionkey, B.r_regionkey, A.n_name, B.r_name
+    FROM
+      (SELECT * FROM nation WHERE n_name LIKE 'A%') A
+    JOIN region B ON A.n_regionkey=B.r_regionkey;
+    */
+    ResultSet res = executeJsonQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
new file mode 100644
index 0000000..3ae0c60
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.exception.TajoException;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+public class TestTruncateTable extends QueryTestCaseBase {
+
+  @Test
+  public final void testTruncateTable() throws Exception {
+    try {
+      ResultSet res = executeFile("table1_ddl.sql");
+      res.close();
+      assertTableExists("truncate_table1");
+
+      res = executeString("select * from truncate_table1");
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(5, numRows);
+      res.close();
+
+      executeString("truncate table truncate_table1");
+      assertTableExists("truncate_table1");
+
+      res = executeString("select * from truncate_table1");
+      numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(0, numRows);
+      res.close();
+    } finally {
+      executeString("DROP TABLE truncate_table1 PURGE");
+    }
+  }
+
+  @Test
+  public final void testTruncateExternalTable() throws TajoException, 
SQLException {
+    try {
+      List<String> createdNames = executeDDL("table2_ddl.sql", 
"truncate_table2", "truncate_table2");
+      assertTableExists(createdNames.get(0));
+
+      ResultSet res = executeString("select * from truncate_table2");
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(4, numRows);
+      res.close();
+
+      executeString("truncate table truncate_table2");
+      fail("Can't truncate external table");
+    } catch (Exception e) {
+      // succeeded
+      assertTableExists("truncate_table2");
+
+      ResultSet res = executeString("select * from truncate_table2");
+      int numRows = 0;
+      while (res.next()) {
+        numRows++;
+      }
+      assertEquals(4, numRows);
+      res.close();
+    } finally {
+      executeString("DROP TABLE truncate_table2 PURGE");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
new file mode 100644
index 0000000..21500e9
--- /dev/null
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * Notations
+ * - S - select
+ * - SA - select *
+ * - U - union
+ * - G - group by
+ * - O - order by
+ */
+@Category(IntegrationTest.class)
+public class TestUnionQuery extends QueryTestCaseBase {
+
+  public TestUnionQuery() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @Test
+  /**
+   * S (SA U SA) O
+   */
+  public final void testUnionAll1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S (S U S) O
+   */
+  public final void testUnionAll2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S O ((S G) U (S G))
+   */
+  public final void testUnionAll3() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (S G)
+   */
+  public final void testUnionAll4() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (S F G)
+   */
+  public final void testUnionAll5() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (SA)
+   */
+  public final void testUnionAll6() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S (SA)
+   */
+  public final void testUnionAll7() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll8() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll9() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll10() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll11() throws Exception {
+    // test filter pushdown
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll12() throws Exception {
+    // test filter pushdown
+    // with stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll13() throws Exception {
+    // test filter pushdown
+    // with stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll14() throws Exception {
+    // test filter pushdown
+    // with group by stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll15() throws Exception {
+    // test filter pushdown
+    // with group by out of union query and join in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAll16() throws Exception {
+    // test filter pushdown
+    // with count distinct out of union query and join in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S (SA U SA) O
+   */
+  public final void testUnion1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S (S U S) O
+   */
+  public final void testUnion2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S O ((S G) U (S G))
+   */
+  public final void testUnion3() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (S G)
+   */
+  public final void testUnion4() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (S F G)
+   */
+  public final void testUnion5() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S G (SA)
+   */
+  public final void testUnion6() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  /**
+   * S (SA)
+   */
+  public final void testUnion7() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion8() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion9() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion10() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion11() throws Exception {
+    // test filter pushdown
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion12() throws Exception {
+    // test filter pushdown
+    // with stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion13() throws Exception {
+    // test filter pushdown
+    // with stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion14() throws Exception {
+    // test filter pushdown
+    // with group by stage in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion15() throws Exception {
+    // test filter pushdown
+    // with group by out of union query and join in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnion16() throws Exception {
+    // test filter pushdown
+    // with count distinct out of union query and join in union query
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAllWithSameAliasNames() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAllWithDifferentAlias() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionAllWithDifferentAliasAndFunction() throws 
Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionWithSameAliasNames() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionWithDifferentAlias() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionWithDifferentAliasAndFunction() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeftUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, b.c_name, a.code from ( " +
+        "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+        "    union all " +
+        "    select o_orderkey as id, 'order' as code from orders " +
+         "  ) a " +
+         "  join customer b on a.id = b.c_custkey" +
+        ") c order by id, code"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000002,order\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testRightUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+            "select * from ( " +
+            "  select a.id, b.c_name, a.code from customer b " +
+            "  join ( " +
+            "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+            "    union all " +
+            "    select o_orderkey as id, 'order' as code from orders " +
+            "  ) a on a.id = b.c_custkey" +
+            ") c order by id, code"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000002,order\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testAllUnionWithJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, a.code as code, b.name, b.code as code2 from ( " +
+        "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+        "    union all " +
+        "    select o_orderkey as id, 'order' as code from orders " +
+        "  ) a " +
+        "  join ( " +
+        "    select c_custkey as id, c_name as name, 'customer' as code from 
customer " +
+        "    union all " +
+        "    select p_partkey as id, p_name as name, 'part' as code from part 
" +
+        "  ) b on a.id = b.id" +
+        ") c order by id, code, code2"
+    );
+
+    String expected =
+        "id,code,name,code2\n" +
+            "-------------------------------\n" +
+            "1,lineitem,Customer#000000001,customer\n" +
+            "1,lineitem,Customer#000000001,customer\n" +
+            "1,lineitem,goldenrod lavender spring chocolate lace,part\n" +
+            "1,lineitem,goldenrod lavender spring chocolate lace,part\n" +
+            "1,order,Customer#000000001,customer\n" +
+            "1,order,goldenrod lavender spring chocolate lace,part\n" +
+            "2,lineitem,Customer#000000002,customer\n" +
+            "2,lineitem,blush thistle blue yellow saddle,part\n" +
+            "2,order,Customer#000000002,customer\n" +
+            "2,order,blush thistle blue yellow saddle,part\n" +
+            "3,lineitem,Customer#000000003,customer\n" +
+            "3,lineitem,Customer#000000003,customer\n" +
+            "3,lineitem,spring green yellow purple cornsilk,part\n" +
+            "3,lineitem,spring green yellow purple cornsilk,part\n" +
+            "3,order,Customer#000000003,customer\n" +
+            "3,order,spring green yellow purple cornsilk,part\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testUnionWithCrossJoin() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+        "select * from ( " +
+        "  select a.id, b.c_name, a.code from ( " +
+            "    select l_orderkey as id, 'lineitem' as code from lineitem " +
+            "    union all " +
+            "    select o_orderkey as id, 'order' as code from orders " +
+            "  ) a, " +
+            "  customer b " +
+        ") c order by id, code, c_name"
+    );
+
+    String expected =
+        "id,c_name,code\n" +
+            "-------------------------------\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000001,lineitem\n" +
+            "1,Customer#000000002,lineitem\n" +
+            "1,Customer#000000002,lineitem\n" +
+            "1,Customer#000000003,lineitem\n" +
+            "1,Customer#000000003,lineitem\n" +
+            "1,Customer#000000004,lineitem\n" +
+            "1,Customer#000000004,lineitem\n" +
+            "1,Customer#000000005,lineitem\n" +
+            "1,Customer#000000005,lineitem\n" +
+            "1,Customer#000000001,order\n" +
+            "1,Customer#000000002,order\n" +
+            "1,Customer#000000003,order\n" +
+            "1,Customer#000000004,order\n" +
+            "1,Customer#000000005,order\n" +
+            "2,Customer#000000001,lineitem\n" +
+            "2,Customer#000000002,lineitem\n" +
+            "2,Customer#000000003,lineitem\n" +
+            "2,Customer#000000004,lineitem\n" +
+            "2,Customer#000000005,lineitem\n" +
+            "2,Customer#000000001,order\n" +
+            "2,Customer#000000002,order\n" +
+            "2,Customer#000000003,order\n" +
+            "2,Customer#000000004,order\n" +
+            "2,Customer#000000005,order\n" +
+            "3,Customer#000000001,lineitem\n" +
+            "3,Customer#000000001,lineitem\n" +
+            "3,Customer#000000002,lineitem\n" +
+            "3,Customer#000000002,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000003,lineitem\n" +
+            "3,Customer#000000004,lineitem\n" +
+            "3,Customer#000000004,lineitem\n" +
+            "3,Customer#000000005,lineitem\n" +
+            "3,Customer#000000005,lineitem\n" +
+            "3,Customer#000000001,order\n" +
+            "3,Customer#000000002,order\n" +
+            "3,Customer#000000003,order\n" +
+            "3,Customer#000000004,order\n" +
+            "3,Customer#000000005,order\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testThreeJoinInUnion() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-881
+    ResultSet res = executeString(
+      "select orders.o_orderkey \n" +
+          "from orders\n" +
+          "join lineitem on orders.o_orderkey = lineitem.l_orderkey\n" +
+          "join customer on orders.o_custkey =  customer.c_custkey\n" +
+          "union all \n" +
+          "select nation.n_nationkey from nation"
+    );
+    String expected =
+        "o_orderkey\n" +
+            "-------------------------------\n" +
+            "1\n" +
+            "1\n" +
+            "2\n" +
+            "3\n" +
+            "3\n" +
+            "0\n" +
+            "1\n" +
+            "2\n" +
+            "3\n" +
+            "4\n" +
+            "5\n" +
+            "6\n" +
+            "7\n" +
+            "8\n" +
+            "9\n" +
+            "10\n" +
+            "11\n" +
+            "12\n" +
+            "13\n" +
+            "14\n" +
+            "15\n" +
+            "16\n" +
+            "17\n" +
+            "18\n" +
+            "19\n" +
+            "20\n" +
+            "21\n" +
+            "22\n" +
+            "23\n" +
+            "24\n";
+
+    assertEquals(expected, resultSetToString(res));
+
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testUnionCaseOfFirstEmptyAndJoin() throws Exception {
+    ResultSet res = executeString(
+        "select a.c_custkey, b.c_custkey from " +
+            "  (select c_custkey, c_nationkey from customer where c_nationkey 
< 0 " +
+            "   union all " +
+            "   select c_custkey, c_nationkey from customer where c_nationkey 
> 0 " +
+            ") a " +
+            "left outer join customer b " +
+            "on a.c_custkey = b.c_custkey "
+    );
+
+    String expected =
+        "c_custkey,c_custkey\n" +
+            "-------------------------------\n" +
+            "1,1\n" +
+            "2,2\n" +
+            "3,3\n" +
+            "4,4\n" +
+            "5,5\n";
+
+    assertEquals(expected, resultSetToString(res));
+    res.close();
+  }
+
+  @Test
+  public void testTajo1368Case1() throws Exception {
+    ResultSet res = executeString(
+        "select * from " +
+            "   (select c_custkey, c_nationkey from customer where c_nationkey 
< 0 " +
+            "   union all " +
+            "   select c_custkey, c_nationkey from customer where c_nationkey 
> 0 " +
+            ") a " +
+            "union all " +
+            "select * from " +
+            "   (select c_custkey, c_nationkey from customer where c_nationkey 
< 0 " +
+            "   union all " +
+            "   select c_custkey, c_nationkey from customer where c_nationkey 
> 0 " +
+            ") b ");
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, resultSetToString(res));
+    res.close();
+  }
+
+  @Test
+  public void testTajo1368Case2() throws Exception {
+    ResultSet res = executeString("select * from ( "+
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from customer) a " +
+        "union all " +
+        "select c_custkey, c_nationkey from ( " +
+        "select c_custkey, c_nationkey from customer) a " +
+        " ) a " +
+        " ) a ");
+
+    String expected = "c_custkey,c_nationkey\n" +
+        "-------------------------------\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n" +
+        "1,15\n" +
+        "2,13\n" +
+        "3,1\n" +
+        "4,4\n" +
+        "5,3\n";
+
+    assertEquals(expected, resultSetToString(res));
+    res.close();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true)
+  @SimpleTest
+  public void testComplexUnion1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true)
+  @SimpleTest
+  public void testComplexUnion2() throws Exception {
+    runSimpleTests();
+  }
+}

Reply via email to