Repository: tajo Updated Branches: refs/heads/master f21d5d677 -> 45ca4993d
TAJO-1783: Query result is not returned by invalid output path. Closes #701 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/45ca4993 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/45ca4993 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/45ca4993 Branch: refs/heads/master Commit: 45ca4993db3659206846b8815ae9a4c091ca0c36 Parents: f21d5d6 Author: Jinho Kim <[email protected]> Authored: Sat Aug 22 00:33:09 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Sat Aug 22 00:33:09 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/query/TestHBaseTable.java | 14 +- .../tajo/engine/query/TestSimpleQuery.java | 179 +++++++++++++++++++ .../results/TestSimpleQuery/testLimit.result | 3 + .../results/TestSimpleQuery/testNoWhere.result | 7 + .../testPartitionColumnWhere.result | 4 + .../results/TestSimpleQuery/testWhere.result | 4 + .../exec/NonForwardQueryResultFileScanner.java | 94 +++------- .../exec/NonForwardQueryResultScanner.java | 10 +- .../apache/tajo/master/exec/QueryExecutor.java | 15 +- .../ws/rs/resources/QueryResultResource.java | 3 +- .../org/apache/tajo/plan/expr/EvalTreeUtil.java | 34 +--- .../plan/rewrite/rules/AccessPathRewriter.java | 1 + .../rewrite/rules/PartitionedTableRewriter.java | 1 + .../org/apache/tajo/plan/util/PlannerUtil.java | 23 +-- .../org/apache/tajo/storage/Tablespace.java | 12 -- .../tajo/storage/hbase/HBaseTablespace.java | 83 --------- .../org/apache/tajo/storage/FileTablespace.java | 145 --------------- 18 files changed, 252 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8653bbc..d837e82 100644 --- a/CHANGES +++ b/CHANGES @@ -232,6 +232,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1783: Query result is not returned by invalid output path. (jinho) + TAJO-1596: TestPythonFunctions occasionally fails. (jinho) TAJO-1741: Two tables having same time zone display different timestamps. http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 4ff5e3e..8642331 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -572,16 +572,16 @@ public class TestHBaseTable extends QueryTestCaseBase { @Test public void testNonForwardQuery() throws Exception { - executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " + - "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " + + executeString("CREATE TABLE hbase_mapped_table1 (rk text, col1 text, col2 text, col3 int) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table1', 'columns'=':key,col1:a,col2:,col3:#b', " + "'hbase.split.rowkeys'='010,040,060,080')").close(); - assertTableExists("hbase_mapped_table"); + assertTableExists("hbase_mapped_table1"); HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); HTable htable = null; try { - hAdmin.tableExists("hbase_table"); - htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + hAdmin.tableExists("hbase_table1"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table1"); org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); assertEquals(5, keys.getFirst().length); @@ -596,11 +596,11 @@ public class TestHBaseTable extends QueryTestCaseBase { htable.put(put); } - ResultSet res = executeString("select * from hbase_mapped_table"); + ResultSet res = executeString("select * from hbase_mapped_table1"); assertResultSet(res); res.close(); } finally { - executeString("DROP TABLE hbase_mapped_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); hAdmin.close(); if (htable == null) { htable.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java new file mode 100644 index 0000000..4c18097 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.exception.QueryNotFoundException; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.master.GlobalEngine; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.QueryManager; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.session.Session; +import org.apache.tajo.util.StringUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestSimpleQuery extends QueryTestCaseBase { + private static String table; + private static String partitionedTable; + private NodeType nodeType; + private String testTable; + + public TestSimpleQuery(NodeType nodeType) throws IOException { + super(TajoConstants.DEFAULT_DATABASE_NAME); + this.nodeType = nodeType; + if (nodeType == NodeType.SCAN) { + testTable = table; + } else if (nodeType == NodeType.PARTITIONS_SCAN) { + testTable = partitionedTable; + } + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + //type + {NodeType.SCAN}, + {NodeType.PARTITIONS_SCAN}, + }); + } + + @BeforeClass + public static void setupClass() throws Exception { + createTestTable(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + client.dropTable(table, true); + client.dropTable(partitionedTable, true); + } + + private static void createTestTable() throws Exception { + partitionedTable = CatalogUtil.normalizeIdentifier("TestSimpleQuery_Partitioned"); + client.executeQueryAndGetResult("create table " + partitionedTable + + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) " + + "as select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + + table = CatalogUtil.normalizeIdentifier("TestSimpleQuery"); + client.executeQueryAndGetResult("create table " + table + + " (col4 text, col1 int4, col2 int4, col3 float8) " + + "as select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + } + + @Test + public final void testNoWhere() throws Exception { + String query = "select * from " + testTable; + + isSimpleQuery(query, true); + hasQueryMaster(query, false); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testLimit() throws Exception { + String query = "select * from " + testTable + " limit 1"; + + isSimpleQuery(query, true); + hasQueryMaster(query, false); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testWhere() throws Exception { + String query = "select * from " + testTable + " where col4 = 'R' and col1 = 3"; + + isSimpleQuery(query, false); + hasQueryMaster(query, true); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testPartitionColumnWhere() throws Exception { + String query = "select * from " + testTable + " where col1 = 1 and (col3 = 36.0 or col3 = 17.0) "; + + if (nodeType == NodeType.SCAN) { + isSimpleQuery(query, false); + hasQueryMaster(query, true); + } else { + isSimpleQuery(query, true); + hasQueryMaster(query, false); + } + + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + private void isSimpleQuery(String queryStr, boolean expected) throws Exception { + GlobalEngine globalEngine = testingCluster.getMaster().getContext().getGlobalEngine(); + + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = testingCluster.getMaster().getContext().getSessionManager().getSession(client.getSessionId()); + LogicalPlan plan = globalEngine.getLogicalPlanner(). + createPlan(queryContext, globalEngine.buildExpressionFromSql(queryStr, session)); + + globalEngine.getLogicalOptimizer().optimize(plan); + assertEquals(expected, PlannerUtil.checkIfSimpleQuery(plan)); + } + + private void hasQueryMaster(String queryStr, boolean expected) throws QueryNotFoundException { + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + + QueryManager queryManager = testingCluster.getMaster().getContext().getQueryJobManager(); + if (expected) { + assertEquals(ClientProtos.SubmitQueryResponse.ResultType.FETCH, res.getResultType()); + QueryStatus status = TajoClientUtil.waitCompletion(client, queryId); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, status.getState()); + client.closeQuery(queryId); + } else { + assertEquals(ClientProtos.SubmitQueryResponse.ResultType.ENCLOSED, res.getResultType()); + QueryInfo queryInfo = queryManager.getFinishedQuery(queryId); + assertNotNull(queryInfo); + assertTrue(StringUtils.isEmpty(queryInfo.getQueryMasterHost())); + client.closeQuery(queryId); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result new file mode 100644 index 0000000..15e3a92 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result @@ -0,0 +1,3 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result new file mode 100644 index 0000000..ba5c0ea --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result @@ -0,0 +1,7 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 +N,1,1,36.0 +N,2,2,38.0 +R,3,2,45.0 +R,3,3,49.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result new file mode 100644 index 0000000..c38334d --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result @@ -0,0 +1,4 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 +N,1,1,36.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result new file mode 100644 index 0000000..9f337a1 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result @@ -0,0 +1,4 @@ +col4,col1,col2,col3 +------------------------------- +R,3,2,45.0 +R,3,3,49.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index ec8760f..877e32b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -18,26 +18,27 @@ package org.apache.tajo.master.exec; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; - import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.expr.EvalTreeUtil; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.planner.physical.SeqScanExec; +import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec; +import org.apache.tajo.engine.planner.physical.ScanExec; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -45,11 +46,10 @@ import java.util.ArrayList; import java.util.List; public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { - private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; - + private QueryId queryId; private String sessionId; - private SeqScanExec scanExec; + private ScanExec scanExec; private TableDesc tableDesc; private RowStoreEncoder rowEncoder; private int maxRow; @@ -57,8 +57,6 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc private TaskAttemptContext taskContext; private TajoConf tajoConf; private ScanNode scanNode; - - private int currentFragmentIndex = 0; public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, TableDesc tableDesc, int maxRow) throws IOException { @@ -71,56 +69,29 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); } - public void init() throws IOException { + public void init() throws IOException, TajoException { initSeqScanExec(); } - /** - * Set partition path and depth if ScanNode's qualification exists - * - * @param tablespace target storage manager to be set with partition info - */ - private void setPartition(Tablespace tablespace) { - if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && - tablespace instanceof FileTablespace) { - StringBuffer path = new StringBuffer(); - int depth = 0; - if (tableDesc.hasPartition()) { - for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns()) { - String partitionValue = EvalTreeUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName()); - if (partitionValue == null) - break; - path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue))); - depth++; - } - } - ((FileTablespace) tablespace).setPartitionPath(path.toString()); - ((FileTablespace) tablespace).setCurrentDepth(depth); - scanNode.setQual(null); - } - } - - private void initSeqScanExec() throws IOException { + private void initSeqScanExec() throws IOException, TajoException { Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get(); - List<Fragment> fragments = null; - setPartition(tablespace); - fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - if (fragments != null && !fragments.isEmpty()) { - FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); + List<Fragment> fragments = Lists.newArrayList(); + if (tableDesc.hasPartition()) { + FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); + fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); + } else { + fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode)); + } + + if (!fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); this.taskContext = new TaskAttemptContext( - new QueryContext(tajoConf), null, - new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), + new QueryContext(tajoConf), null, + new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); - try { - // scanNode must be clone cause SeqScanExec change target in the case of - // a partitioned table. - scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); - } + scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos); scanExec.init(); - currentFragmentIndex += fragments.size(); } } @@ -132,10 +103,6 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc return sessionId; } - public void setScanExec(SeqScanExec scanExec) { - this.scanExec = scanExec; - } - public TableDesc getTableDesc() { return tableDesc; } @@ -163,18 +130,9 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc if (tuple == null) { scanExec.close(); scanExec = null; - initSeqScanExec(); - if (scanExec != null) { - tuple = scanExec.next(); - } - if (tuple == null) { - if (scanExec != null) { - scanExec.close(); - scanExec = null; - } - break; - } + break; } + rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); rowCount++; currentNumRows++; http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java index 75608df..a104c99 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -18,14 +18,14 @@ package org.apache.tajo.master.exec; -import java.io.IOException; -import java.util.List; - +import com.google.protobuf.ByteString; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.exception.TajoException; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.List; public interface NonForwardQueryResultScanner { @@ -41,7 +41,7 @@ public interface NonForwardQueryResultScanner { public TableDesc getTableDesc(); - public void init() throws IOException; + public void init() throws IOException, TajoException; public int getCurrentRowNumber(); http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 207f91b..39013df 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -53,9 +53,7 @@ import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.EvalContext; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.expr.GeneralFunctionEval; +import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; @@ -249,15 +247,12 @@ public class QueryExecutor { public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder response) throws Exception { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); - if (scanNode == null) { - scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); - } TableDesc desc = scanNode.getTableDesc(); - // Keep info for partition-column-only queries - SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION); - if (desc.isExternal() && desc.hasPartition() && selectionNode != null) { - scanNode.setQual(selectionNode.getQual()); + + if (desc.hasPartition()) { + scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); } + int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 40e3f25..2efbfdd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; @@ -110,7 +111,7 @@ public class QueryResultResource { private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( MasterContext masterContext, Session session, - QueryId queryId) throws IOException { + QueryId queryId) throws IOException, TajoException { NonForwardQueryResultScanner resultScanner = session.getNonForwardQueryResultScanner(queryId); if (resultScanner == null) { QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryId); http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java index 89fd81b..cf00534 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java @@ -599,41 +599,11 @@ public class EvalTreeUtil { } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) { return true; } - } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + } else if ((type == EvalType.AND || type == EvalType.OR) + && left instanceof BinaryEval && right instanceof BinaryEval) { return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema); } } return false; } - - /** - * Get partition constant value associated with `columnName`. - * - * @param node EvalNode having query predicates - * @param columnName Column name to be looked up - * @return String The value associated with `columnName` in the predicates - */ - public static String getPartitionValue(EvalNode node, String columnName) { - if (node != null && node instanceof BinaryEval) { - BinaryEval eval = (BinaryEval)node; - EvalNode left = eval.getLeftExpr(); - EvalNode right = eval.getRightExpr(); - EvalType type = eval.getType(); - - if (type == EvalType.EQUAL) { - if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) { - return ((ConstEval)right).getValue().toString(); - } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) { - return ((ConstEval)left).getValue().toString(); - } - } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { - String value = getPartitionValue(left, columnName); - if (value == null) { - value = getPartitionValue(right, columnName); - } - return value; - } - } - return null; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java index afabe7a..33ce4f4 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java @@ -122,6 +122,7 @@ public class AccessPathRewriter implements LogicalPlanRewriteRule { } else { PlannerUtil.replaceNode(plan, stack.peek(), scanNode, indexScanNode); } + block.registerNode(indexScanNode); } return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 244d385..b5cd42b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -419,6 +419,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } else { PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); } + block.registerNode(rewrittenScanNode); } catch (IOException e) { throw new TajoInternalError("Partitioned Table Rewrite Failed: \n" + e.getMessage()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 8e0c421..a9dca4c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -119,7 +119,7 @@ public class PlannerUtil { PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; - boolean prefixPartitionWhere = false; + boolean partitionWhere = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); if (scanNode == null) { @@ -154,33 +154,18 @@ public class PlannerUtil { } } - /** - * TODO: Remove isExternal check after resolving the following issues - * - TAJO-1416: INSERT INTO EXTERNAL PARTITIONED TABLE - * - TAJO-1441: INSERT INTO MANAGED PARTITIONED TABLE - */ - if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) { + if (!noWhere && scanNode.getTableDesc().hasPartition()) { EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual(); Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema(); if (EvalTreeUtil.checkIfPartitionSelection(node, partSchema)) { - prefixPartitionWhere = true; - boolean isPrefix = true; - for (Column c : partSchema.getRootColumns()) { - String value = EvalTreeUtil.getPartitionValue(node, c.getSimpleName()); - if (isPrefix && value == null) - isPrefix = false; - else if (!isPrefix && value != null) { - prefixPartitionWhere = false; - break; - } - } + partitionWhere = true; } } } return !checkIfDDLPlan(rootNode) && (simpleOperator && noComplexComputation && isOneQueryBlock && - noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation); + noOrderBy && noGroupBy && (noWhere || partitionWhere) && noJoin && singleRelation); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 09d8deb..d131d9e 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -132,18 +132,6 @@ public abstract class Tablespace { ScanNode scanNode) throws IOException, TajoException; /** - * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. - * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. - * @param tableDesc The table description for the target data. - * @param currentPage The current page number within the entire list. - * @param numFragments The number of fragments in the result. - * @return The list of input fragments. - * @throws java.io.IOException - */ - public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException; - - /** * It returns the storage property. * @return The storage property */ http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index f613b88..2204923 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -599,89 +599,6 @@ public class HBaseTablespace extends Tablespace { } } - @Override - public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException { - HTable htable = null; - HBaseAdmin hAdmin = null; - try { - htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); - - org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - return new ArrayList<Fragment>(1); - } - hAdmin = new HBaseAdmin(hbaseConf); - Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); - - List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); - - int start = currentPage * numFragments; - if (start >= keys.getFirst().length) { - return new ArrayList<Fragment>(1); - } - int end = (currentPage + 1) * numFragments; - if (end > keys.getFirst().length) { - end = keys.getFirst().length; - } - for (int i = start; i < end; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - - String regionName = location.getRegionInfo().getRegionNameAsString(); - ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); - if (serverLoad == null) { - serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); - serverLoadMap.put(location.getServerName(), serverLoad); - } - - HBaseFragment fragment = new HBaseFragment( - tableDesc.getUri(), - tableDesc.getName(), - htable.getName().getNameAsString(), - location.getRegionInfo().getStartKey(), - location.getRegionInfo().getEndKey(), - location.getHostname()); - - // get region size - boolean foundLength = false; - for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { - if (regionName.equals(Bytes.toString(entry.getKey()))) { - RegionLoad regionLoad = entry.getValue(); - long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; - if (storeLength == 0) { - // If store size is smaller than 1 MB, storeLength is zero - storeLength = 1 * 1024 * 1024; //default 1MB - } - fragment.setLength(storeLength); - foundLength = true; - break; - } - } - - if (!foundLength) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } - - fragments.add(fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } - } - - if (!fragments.isEmpty()) { - ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); - } - return fragments; - } finally { - if (htable != null) { - htable.close(); - } - if (hAdmin != null) { - hAdmin.close(); - } - } - } - public HConnection getConnection() throws IOException { synchronized(connMap) { HConnectionKey key = new HConnectionKey(hbaseConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/45ca4993/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 678675d..f79471a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -47,7 +47,6 @@ import java.io.IOException; import java.net.URI; import java.text.NumberFormat; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; @@ -178,21 +177,6 @@ public class FileTablespace extends Tablespace { return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); } - private String partitionPath = ""; - private int currentDepth = 0; - - /** - * Set a specific partition path for partition-column only queries - * @param path The partition prefix path - */ - public void setPartitionPath(String path) { partitionPath = path; } - - /** - * Set a depth of partition path for partition-column only queries - * @param depth Depth of partitions - */ - public void setCurrentDepth(int depth) { currentDepth = depth; } - @VisibleForTesting public Appender getAppender(TableMeta meta, Schema schema, Path filePath) throws IOException { @@ -707,135 +691,6 @@ public class FileTablespace extends Tablespace { } @Override - public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException { - // Listing table data file which is not empty. - // If the table is a partitioned table, return file list which has same partition key. - Path tablePath = new Path(tableDesc.getUri()); - FileSystem fs = tablePath.getFileSystem(conf); - - //In the case of partitioned table, we should return same partition key data files. - int partitionDepth = 0; - if (tableDesc.hasPartition()) { - partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size(); - } - - List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); - if (fs.exists(tablePath)) { - if (!partitionPath.isEmpty()) { - Path partPath = new Path(tableDesc.getUri() + partitionPath); - if (fs.exists(partPath)) { - getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); - } - } else { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); - } - } - - List<Fragment> fragments = new ArrayList<Fragment>(); - - String[] previousPartitionPathNames = null; - for (FileStatus eachFile: nonZeroLengthFiles) { - FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); - - if (partitionDepth > 0) { - // finding partition key; - Path filePath = fileFragment.getPath(); - Path parentPath = filePath; - String[] parentPathNames = new String[partitionDepth]; - for (int i = 0; i < partitionDepth; i++) { - parentPath = parentPath.getParent(); - parentPathNames[partitionDepth - i - 1] = parentPath.getName(); - } - - // If current partitionKey == previousPartitionKey, add to result. - if (previousPartitionPathNames == null) { - fragments.add(fileFragment); - } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { - fragments.add(fileFragment); - } else { - break; - } - previousPartitionPathNames = parentPathNames; - } else { - fragments.add(fileFragment); - } - } - - return fragments; - } - - /** - * - * @param fs - * @param path The table path - * @param result The final result files to be used - * @param startFileIndex - * @param numResultFiles - * @param currentFileIndex - * @param partitioned A flag to indicate if this table is partitioned - * @param currentDepth Current visiting depth of partition directories - * @param maxDepth The partition depth of this table - * @throws IOException - */ - private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, - int startFileIndex, int numResultFiles, - AtomicInteger currentFileIndex, boolean partitioned, - int currentDepth, int maxDepth) throws IOException { - // Intermediate directory - if (fs.isDirectory(path)) { - - FileStatus[] files = fs.listStatus(path, hiddenFileFilter); - - if (files != null && files.length > 0) { - - for (FileStatus eachFile : files) { - - // checking if the enough number of files are found - if (result.size() >= numResultFiles) { - return; - } - if (eachFile.isDirectory()) { - - getNonZeroLengthDataFiles( - fs, - eachFile.getPath(), - result, - startFileIndex, - numResultFiles, - currentFileIndex, - partitioned, - currentDepth + 1, // increment a visiting depth - maxDepth); - - // if partitioned table, we should ignore files located in the intermediate directory. - // we can ensure that this file is in leaf directory if currentDepth == maxDepth. - } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(eachFile); - } - currentFileIndex.incrementAndGet(); - } - } - } - - // Files located in leaf directory - } else { - FileStatus fileStatus = fs.getFileStatus(path); - if (fileStatus != null && fileStatus.getLen() > 0) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(fileStatus); - } - currentFileIndex.incrementAndGet(); - if (result.size() >= numResultFiles) { - return; - } - } - } - } - - @Override public StorageProperty getProperty() { return FileStorageProperties; }
