http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 22e5cc7..d98d8cf 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -244,7 +244,7 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { } @Override - public List<PartitionDescProto> getAllPartitions(final String tableName) throws UndefinedDatabaseException, + public List<PartitionDescProto> getPartitionsOfTable(final String tableName) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException { final BlockingInterface stub = conn.getTMStub();
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index d9c7ff7..36b8e4e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -35,6 +35,7 @@ import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import java.net.InetSocketAddress; import java.net.URI; @@ -245,9 +246,9 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return catalogClient.getFunctions(functionName); } - public List<PartitionDescProto> getAllPartitions(final String tableName) throws UndefinedDatabaseException, + public List<PartitionDescProto> getPartitionsOfTable(final String tableName) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException { - return catalogClient.getAllPartitions(tableName); + return catalogClient.getPartitionsOfTable(tableName); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java index 368e4ae..990178a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java @@ -67,6 +67,7 @@ public class ErrorMessages { ADD_MESSAGE(UNDEFINED_FUNCTION, "function does not exist: %s", 1); ADD_MESSAGE(UNDEFINED_PARTITION_METHOD, "table '%s' is not a partitioned table", 1); ADD_MESSAGE(UNDEFINED_PARTITION, "partition '%s' does not exist", 1); + ADD_MESSAGE(UNDEFINED_PARTITIONS, "No partitions for table '%s'", 1); ADD_MESSAGE(UNDEFINED_PARTITION_KEY, "'%s' column is not a partition key", 1); ADD_MESSAGE(UNDEFINED_OPERATOR, "operator does not exist: '%s'", 1); ADD_MESSAGE(UNDEFINED_INDEX_FOR_TABLE, "index ''%s' does not exist", 1); http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-common/src/main/java/org/apache/tajo/exception/ExceptionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ExceptionUtil.java index 59c5776..ad6c154 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ExceptionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ExceptionUtil.java @@ -55,6 +55,7 @@ public class ExceptionUtil { ADD_EXCEPTION(UNDEFINED_TABLE, UndefinedTableException.class); ADD_EXCEPTION(UNDEFINED_COLUMN, UndefinedColumnException.class); ADD_EXCEPTION(UNDEFINED_FUNCTION, UndefinedFunctionException.class); + ADD_EXCEPTION(UNDEFINED_PARTITION_METHOD, UndefinedPartitionMethodException.class); ADD_EXCEPTION(UNDEFINED_PARTITION, UndefinedPartitionException.class); ADD_EXCEPTION(UNDEFINED_PARTITION_KEY, UndefinedPartitionKeyException.class); ADD_EXCEPTION(UNDEFINED_OPERATOR, UndefinedOperatorException.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index 169c166..cf98f37 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -161,6 +161,10 @@ public class ReturnStateUtil { return returnError(ResultCode.UNDEFINED_PARTITION, partitionName); } + public static ReturnState errUndefinedPartitions(String tbName) { + return returnError(ResultCode.UNDEFINED_PARTITIONS, tbName); + } + public static ReturnState errUndefinedPartitionMethod(String tbName) { return returnError(ResultCode.UNDEFINED_PARTITION_METHOD, tbName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedPartitionMethodException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedPartitionMethodException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedPartitionMethodException.java index 459269c..ca61c70 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedPartitionMethodException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedPartitionMethodException.java @@ -29,7 +29,7 @@ public class UndefinedPartitionMethodException extends TajoException { super(state); } - public UndefinedPartitionMethodException(String partitionName) { - super(ResultCode.UNDEFINED_PARTITION_METHOD, partitionName); + public UndefinedPartitionMethodException(String tableName) { + super(ResultCode.UNDEFINED_PARTITION_METHOD, tableName); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-common/src/main/proto/errors.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto index 573fc7e..677f799 100644 --- a/tajo-common/src/main/proto/errors.proto +++ b/tajo-common/src/main/proto/errors.proto @@ -114,6 +114,7 @@ enum ResultCode { UNDEFINED_OPERATOR = 522; // SQLState: 42883 (=UNDEFINED_FUNCTION) UNDEFINED_PARTITION_KEY = 523; // ? UNDEFINED_TABLESPACE_HANDLER = 524; // SQLState: 42T11 - No Tablespace Handler for the URI scheme + UNDEFINED_PARTITIONS = 525; // ? DUPLICATE_TABLESPACE = 531; DUPLICATE_DATABASE = 532; // SQLState: 42P04 http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestEvalNodeToExprConverter.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestEvalNodeToExprConverter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestEvalNodeToExprConverter.java new file mode 100644 index 0000000..2baa79a --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestEvalNodeToExprConverter.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryVars; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.*; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.catalog.*; +import org.apache.tajo.engine.function.FunctionLoader; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.expr.AlgebraicUtil; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.util.EvalNodeToExprConverter; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.session.Session; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Stack; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.*; + +public class TestEvalNodeToExprConverter { + private static TajoTestingCluster util; + private static CatalogService catalog; + private static SQLAnalyzer sqlAnalyzer; + private static LogicalPlanner planner; + private static TPCH tpch; + private static Session session = LocalTajoTestingUtility.createDummySession(); + + @BeforeClass + public static void setUp() throws Exception { + util = new TajoTestingCluster(); + util.startCatalogCluster(); + catalog = util.getCatalogService(); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:1234"); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + + for (FunctionDesc funcDesc : FunctionLoader.findLegacyFunctions()) { + catalog.createFunction(funcDesc); + } + + // TPC-H Schema for Complex Queries + String [] tpchTables = { + "part", "supplier", "partsupp", "nation", "region", "lineitem" + }; + tpch = new TPCH(); + tpch.loadSchemas(); + tpch.loadOutSchema(); + for (String table : tpchTables) { + TableMeta m = CatalogUtil.newTableMeta("TEXT"); + TableDesc d = CatalogUtil.newTableDesc( + CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, table), tpch.getSchema(table), m, + CommonTestingUtil.getTestDir()); + catalog.createTable(d); + } + + sqlAnalyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + static String[] QUERIES = { + "select * from lineitem where L_ORDERKEY > 500", //0 + "select * from region where r_name = 'EUROPE'", //1 + "select * from lineitem where L_DISCOUNT >= 0.05 and L_DISCOUNT <= 0.07 OR L_QUANTITY < 24.0 ", //2 + "select * from lineitem where L_DISCOUNT between 0.06 - 0.01 and 0.08 + 0.02 and L_ORDERKEY < 24 ", //3 + "select * from lineitem where (case when L_DISCOUNT > 0.0 then L_DISCOUNT / L_TAX else null end) > 1.2 ", //4 + "select * from part where p_brand = 'Brand#23' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') " + + "and p_size between 1 and 10", //5 + }; + + private static QueryContext createQueryContext() { + QueryContext qc = new QueryContext(util.getConfiguration(), session); + qc.put(QueryVars.DEFAULT_SPACE_URI, "file:/"); + qc.put(QueryVars.DEFAULT_SPACE_ROOT_URI, "file:/"); + return qc; + } + + @Test + public final void testBinaryOperator1() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[0]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + + BinaryOperator binaryOperator = AlgebraicUtil.findTopExpr(resultExpr, OpType.GreaterThan); + assertNotNull(binaryOperator); + + ColumnReferenceExpr column = binaryOperator.getLeft(); + assertEquals("default.lineitem", column.getQualifier()); + assertEquals("l_orderkey", column.getName()); + + LiteralValue literalValue = binaryOperator.getRight(); + assertEquals("500", literalValue.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Integer, literalValue.getValueType()); + } + + @Test + public final void testBinaryOperator2() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[1]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + BinaryOperator equals = AlgebraicUtil.findTopExpr(resultExpr, OpType.Equals); + assertNotNull(equals); + + ColumnReferenceExpr column = equals.getLeft(); + assertEquals("default.region", column.getQualifier()); + assertEquals("r_name", column.getName()); + + LiteralValue literalValue = equals.getRight(); + assertEquals("EUROPE", literalValue.getValue()); + assertEquals(LiteralValue.LiteralType.String, literalValue.getValueType()); + } + + @Test + public final void testBinaryOperator3() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[2]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + + BinaryOperator greaterThanOrEquals = AlgebraicUtil.findTopExpr(resultExpr, OpType.GreaterThanOrEquals); + assertNotNull(greaterThanOrEquals); + + ColumnReferenceExpr greaterThanOrEqualsLeft = greaterThanOrEquals.getLeft(); + assertEquals("default.lineitem", greaterThanOrEqualsLeft.getQualifier()); + assertEquals("l_discount", greaterThanOrEqualsLeft.getName()); + + LiteralValue greaterThanOrEqualsRight = greaterThanOrEquals.getRight(); + assertEquals("0.05", greaterThanOrEqualsRight.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, greaterThanOrEqualsRight.getValueType()); + + BinaryOperator lessThanOrEquals = AlgebraicUtil.findTopExpr(resultExpr, OpType.LessThanOrEquals); + assertNotNull(lessThanOrEquals); + + ColumnReferenceExpr lessThanOrEqualsLeft = lessThanOrEquals.getLeft(); + assertEquals("default.lineitem", lessThanOrEqualsLeft.getQualifier()); + assertEquals("l_discount", lessThanOrEqualsLeft.getName()); + + LiteralValue lessThanOrEqualsRight = lessThanOrEquals.getRight(); + assertEquals("0.07", lessThanOrEqualsRight.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, lessThanOrEqualsRight.getValueType()); + + BinaryOperator lessThan = AlgebraicUtil.findTopExpr(resultExpr, OpType.LessThan); + assertNotNull(lessThan); + + ColumnReferenceExpr lessThanLeft = lessThan.getLeft(); + assertEquals("default.lineitem", lessThanLeft.getQualifier()); + assertEquals("l_quantity", lessThanLeft.getName()); + + LiteralValue lessThanRight = lessThan.getRight(); + assertEquals("24.0", lessThanRight.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, lessThanRight.getValueType()); + + BinaryOperator leftExpr = new BinaryOperator(OpType.And, greaterThanOrEquals, lessThanOrEquals); + + BinaryOperator topExpr = AlgebraicUtil.findTopExpr(resultExpr, OpType.Or); + assertEquals(leftExpr, topExpr.getLeft()); + assertEquals(lessThan, topExpr.getRight()); + } + + @Test + public final void testBetweenPredicate() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[3]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + + BinaryOperator binaryOperator = AlgebraicUtil.findTopExpr(resultExpr, OpType.LessThan); + assertNotNull(binaryOperator); + ColumnReferenceExpr column = binaryOperator.getLeft(); + assertEquals("default.lineitem", column.getQualifier()); + assertEquals("l_orderkey", column.getName()); + + LiteralValue literalValue = binaryOperator.getRight(); + assertEquals("24", literalValue.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Integer, literalValue.getValueType()); + + BetweenPredicate between = AlgebraicUtil.findTopExpr(resultExpr, OpType.Between); + assertFalse(between.isNot()); + assertFalse(between.isSymmetric()); + + ColumnReferenceExpr predicand = (ColumnReferenceExpr)between.predicand(); + assertEquals("default.lineitem", predicand.getQualifier()); + assertEquals("l_discount", predicand.getName()); + + BinaryOperator begin = (BinaryOperator)between.begin(); + assertEquals(OpType.Minus, begin.getType()); + LiteralValue left = begin.getLeft(); + assertEquals("0.06", left.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, left.getValueType()); + LiteralValue right = begin.getRight(); + assertEquals("0.01", right.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, right.getValueType()); + + BinaryOperator end = (BinaryOperator)between.end(); + assertEquals(OpType.Plus, end.getType()); + left = end.getLeft(); + assertEquals("0.08", left.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, left.getValueType()); + right = end.getRight(); + assertEquals("0.02", right.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, right.getValueType()); + } + + @Test + public final void testCaseWhenPredicate() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[4]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + + CaseWhenPredicate caseWhen = AlgebraicUtil.findTopExpr(resultExpr, OpType.CaseWhen); + assertNotNull(caseWhen); + + CaseWhenPredicate.WhenExpr[] whenExprs = new CaseWhenPredicate.WhenExpr[1]; + caseWhen.getWhens().toArray(whenExprs); + + BinaryOperator condition = (BinaryOperator) whenExprs[0].getCondition(); + assertEquals(OpType.GreaterThan, condition.getType()); + + ColumnReferenceExpr conditionLeft = condition.getLeft(); + assertEquals("default.lineitem", conditionLeft.getQualifier()); + assertEquals("l_discount", conditionLeft.getName()); + + LiteralValue conditionRight = condition.getRight(); + assertEquals("0.0", conditionRight.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, conditionRight.getValueType()); + + BinaryOperator result = (BinaryOperator) whenExprs[0].getResult(); + assertEquals(OpType.Divide, result.getType()); + ColumnReferenceExpr resultLeft = result.getLeft(); + assertEquals("default.lineitem", resultLeft.getQualifier()); + assertEquals("l_discount", resultLeft.getName()); + + ColumnReferenceExpr resultRight = result.getRight(); + assertEquals("default.lineitem", resultRight.getQualifier()); + assertEquals("l_tax", resultRight.getName()); + + BinaryOperator greaterThan = AlgebraicUtil.findMostBottomExpr(resultExpr, OpType.GreaterThan); + assertNotNull(greaterThan); + + assertEquals(greaterThan.getLeft(), caseWhen); + + LiteralValue binaryOperatorRight = greaterThan.getRight(); + assertEquals("1.2", binaryOperatorRight.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Float, conditionRight.getValueType()); + } + + @Test + public final void testThreeFilters() throws CloneNotSupportedException, TajoException { + QueryContext qc = createQueryContext(); + + Expr expr = sqlAnalyzer.parse(QUERIES[5]); + + LogicalPlan plan = planner.createPlan(qc, expr); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); + optimizer.optimize(plan); + + LogicalNode node = plan.getRootBlock().getRoot(); + ScanNode scanNode = PlannerUtil.findTopNode(node, NodeType.SCAN); + + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(scanNode.getTableName()); + convertor.visit(null, scanNode.getQual(), new Stack<EvalNode>()); + + Expr resultExpr = convertor.getResult(); + + BetweenPredicate between = AlgebraicUtil.findTopExpr(resultExpr, OpType.Between); + assertFalse(between.isNot()); + assertFalse(between.isSymmetric()); + + ColumnReferenceExpr predicand = (ColumnReferenceExpr)between.predicand(); + assertEquals("default.part", predicand.getQualifier()); + assertEquals("p_size", predicand.getName()); + + LiteralValue begin = (LiteralValue)between.begin(); + assertEquals("1", begin.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Integer, begin.getValueType()); + + LiteralValue end = (LiteralValue)between.end(); + assertEquals("10", end.getValue()); + assertEquals(LiteralValue.LiteralType.Unsigned_Integer, end.getValueType()); + + BinaryOperator equals = AlgebraicUtil.findTopExpr(resultExpr, OpType.Equals); + assertNotNull(equals); + + ColumnReferenceExpr equalsLeft = equals.getLeft(); + assertEquals("default.part", equalsLeft.getQualifier()); + assertEquals("p_brand", equalsLeft.getName()); + + LiteralValue equalsRight = equals.getRight(); + assertEquals("Brand#23", equalsRight.getValue()); + assertEquals(LiteralValue.LiteralType.String, equalsRight.getValueType()); + + InPredicate inPredicate = AlgebraicUtil.findTopExpr(resultExpr, OpType.InPredicate); + assertNotNull(inPredicate); + + ValueListExpr valueList = (ValueListExpr)inPredicate.getInValue(); + assertEquals(4, valueList.getValues().length); + for(int i = 0; i < valueList.getValues().length; i++) { + LiteralValue literalValue = (LiteralValue) valueList.getValues()[i]; + + if (i == 0) { + assertEquals("MED BAG", literalValue.getValue()); + } else if (i == 1) { + assertEquals("MED BOX", literalValue.getValue()); + } else if (i == 2) { + assertEquals("MED PKG", literalValue.getValue()); + } else { + assertEquals("MED PACK", literalValue.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index d10c0f2..6262a7a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -92,7 +92,7 @@ public class TestAlterTable extends QueryTestCaseBase { executeDDL("alter_table_add_partition1.sql", null); executeDDL("alter_table_add_partition2.sql", null); - List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions("TestAlterTable", "partitioned_table"); + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable("TestAlterTable", "partitioned_table"); assertNotNull(partitions); assertEquals(partitions.size(), 1); assertEquals(partitions.get(0).getPartitionName(), "col3=1/col4=2"); @@ -110,7 +110,7 @@ public class TestAlterTable extends QueryTestCaseBase { executeDDL("alter_table_drop_partition1.sql", null); executeDDL("alter_table_drop_partition2.sql", null); - partitions = catalog.getPartitions("TestAlterTable", "partitioned_table"); + partitions = catalog.getPartitionsOfTable("TestAlterTable", "partitioned_table"); assertNotNull(partitions); assertEquals(partitions.size(), 0); assertFalse(fs.exists(partitionPath)); @@ -202,7 +202,7 @@ public class TestAlterTable extends QueryTestCaseBase { private void verifyPartitionCount(String databaseName, String tableName, int expectedCount) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedPartitionException { - List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions(databaseName, tableName); + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable(databaseName, tableName); assertNotNull(partitions); assertEquals(partitions.size(), expectedCount); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 5b49507..e8ca0da 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -251,7 +251,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, - new String[]{"key"}, desc.getStats().getNumRows()); + new String[]{"key"}, desc.getStats().getNumRows()); executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); @@ -433,8 +433,15 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); } - res = executeString("SELECT col1, col2, col3 FROM " + tableName); + res = executeString("select * from " + tableName + " WHERE (col1 ='1' or col1 = '100') and col3 > 20"); String result = resultSetToString(res); + String expectedResult = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,36.0\n"; + res.close(); + assertEquals(expectedResult, result); + + res = executeString("SELECT col1, col2, col3 FROM " + tableName); res.close(); verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, @@ -589,7 +596,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception { - res = executeString("select * from " + tableName + " where col2 = 1"); + res = executeString("select * from " + tableName + " where col2 = 1 order by col4, col1, col2, col3"); String resultSetData = resultSetToString(res); res.close(); String expected = "col4,col1,col2,col3\n" + @@ -671,7 +678,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1"}, - desc.getStats().getNumRows()); + desc.getStats().getNumRows()); executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1042,7 +1049,7 @@ public class TestTablePartitions extends QueryTestCaseBase { TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"}, - desc.getStats().getNumRows()); + desc.getStats().getNumRows()); executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1242,6 +1249,8 @@ public class TestTablePartitions extends QueryTestCaseBase { private void verifyPartitionDirectoryFromCatalog(String databaseName, String tableName, String[] partitionColumns, Long numRows) throws Exception { int rowCount = 0; + FileSystem fs = FileSystem.get(conf); + Path partitionPath = null; // Get all partition column values StringBuilder query = new StringBuilder(); @@ -1253,7 +1262,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } query.append(" ").append(partitionColumn); } - query.append(" FROM ").append(tableName); + query.append(" FROM ").append(databaseName).append(".").append(tableName); ResultSet res = executeString(query.toString()); StringBuilder partitionName = new StringBuilder(); @@ -1274,6 +1283,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertNotNull(partitionDescProto); assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + partitionName.toString()) > 0); + partitionPath = new Path(partitionDescProto.getPath()); + ContentSummary cs = fs.getContentSummary(partitionPath); + + assertEquals(cs.getLength(), partitionDescProto.getNumBytes()); rowCount++; } @@ -1313,7 +1326,7 @@ public class TestTablePartitions extends QueryTestCaseBase { // If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R' // partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List. // If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore. - List<PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, tableName); + List<PartitionDescProto> partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, tableName); assertEquals(2, partitions.size()); PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N"); @@ -1325,4 +1338,503 @@ public class TestTablePartitions extends QueryTestCaseBase { executeString("DROP TABLE " + tableName + " PURGE"); } } + + @Test + public final void testPatternMatchingPredicatesAndStringFunctions() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testPatternMatchingPredicatesAndStringFunctions"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) " + + " as select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // Like + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate LIKE '1996%' and l_returnflag = 'N' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "1,1,1996-03-13,N\n" + + "1,1,1996-04-12,N\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Not like + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate NOT LIKE '1996%' and l_returnflag IN ('R') order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // In + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate IN ('1993-11-09', '1994-02-02', '1997-01-28') AND l_returnflag = 'R' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Similar to + res = executeString("SELECT * FROM " + tableName + " WHERE l_shipdate similar to '1993%' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Regular expression + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate regexp '[1-2][0-9][0-9][3-9]-[0-1][0-9]-[0-3][0-9]' " + + " AND l_returnflag <> 'N' ORDER BY l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Concatenate + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate = ( '1996' || '-' || '03' || '-' || '13' ) order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "1,1,1996-03-13,N\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testDatePartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testDatePartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + " (col1 int4, col2 int4) partition by column(key date) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key date) " + + " as select l_orderkey, l_partkey, l_shipdate::date from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + " WHERE key <= date '1995-09-01' order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > to_date('1993-01-01', 'YYYY-MM-DD') " + + " and key < to_date('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between date '1993-01-01' and date '1997-01-01' order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12\n" + + "1,1,1996-03-13\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Cast + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < '1997-01-01'::timestamp order by col1, col2, key "); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-03-13\n" + + "1,1,1996-04-12\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Interval + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < date '1994-01-01' + interval '1 year' order by col1, col2, key "); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // DateTime Function #1 + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < add_months(date '1994-01-01', 12) order by col1, col2, key "); + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // DateTime Function #2 + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < add_months('1994-01-01'::timestamp, 12) order by col1, col2, key "); + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testTimestampPartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testTimestampPartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(key timestamp) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + + " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key timestamp) " + + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + + " WHERE key <= to_timestamp('1995-09-01', 'YYYY-MM-DD') order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > to_timestamp('1993-01-01', 'YYYY-MM-DD') and " + + "key < to_timestamp('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between to_timestamp('1993-01-01', 'YYYY-MM-DD') " + + "and to_timestamp('1997-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12 00:00:00\n" + + "1,1,1996-03-13 00:00:00\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testTimePartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testTimePartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(key time) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + + " select l_orderkey, l_partkey " + + " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " + + " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " + + " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " + + " ELSE cast ('00:00:00' as time) END " + + " from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key time) " + + " as select l_orderkey, l_partkey " + + " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " + + " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " + + " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " + + " ELSE cast ('00:00:00' as time) END " + + " from lineitem"); + } + + assertTrue(client.existTable(tableName)); + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + + " WHERE key <= cast('12:10:20' as time) order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,00:00:00\n" + + "1,1,11:20:40\n" + + "2,2,12:10:20\n" + + "3,3,00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > cast('00:00:00' as time) and " + + "key < cast('12:10:00' as time) order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,11:20:40\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between cast('11:00:00' as time) " + + "and cast('13:00:00' as time) order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,11:20:40\n" + + "2,2,12:10:20\n" + + "3,2,12:10:30\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testDatabaseNameIncludeTableName() throws Exception { + executeString("create database test_partition").close(); + + String databaseName = "test_partition"; + String tableName = CatalogUtil.normalizeIdentifier("part"); + + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + databaseName + "." + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); + + assertTrue(catalog.existsTable(databaseName, tableName)); + assertEquals(2, catalog.getTableDesc(databaseName, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(databaseName, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + databaseName + "." + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } else { + executeString( + "create table "+ databaseName + "." + tableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from lineitem"); + } + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + verifyPartitionDirectoryFromCatalog(databaseName, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + + ResultSet res = executeString("select * from " + databaseName + "." + tableName + " ORDER BY key"); + + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,17.0\n" + + "1,1,36.0\n" + + "2,2,38.0\n" + + "3,2,45.0\n" + + "3,3,49.0\n"; + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + databaseName + "." + tableName + " PURGE").close(); + executeString("DROP database " + databaseName).close(); + } + + @Test + public void testAbnormalDirectories() throws Exception { + ResultSet res = null; + FileSystem fs = FileSystem.get(conf); + Path path = null; + + String tableName = CatalogUtil.normalizeIdentifier("testAbnormalDirectories"); + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ").close(); + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem").close(); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from lineitem").close(); + } + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + + // When partitions only exist on file system without catalog. + String externalTableName = "testCreateExternalColumnPartitionedTable"; + + executeString("create external table " + externalTableName + " (col1 int4, col2 int4) " + + " USING TEXT WITH ('text.delimiter'='|') PARTITION BY COLUMN (key float8) " + + " location '" + tableDesc.getUri().getPath() + "'").close(); + + res = executeString("SELECT COUNT(*) AS cnt FROM " + externalTableName); + String result = resultSetToString(res); + String expectedResult = "cnt\n" + + "-------------------------------\n" + + "5\n"; + res.close(); + assertEquals(expectedResult, result); + + // Make abnormal directories + path = new Path(tableDesc.getUri().getPath(), "key=100.0"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().getPath(), "key="); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().getPath(), "col1=a"); + fs.mkdirs(path); + assertEquals(8, fs.listStatus(path.getParent()).length); + + res = executeString("SELECT COUNT(*) AS cnt FROM " + externalTableName + " WHERE key > 40.0"); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "2\n"; + res.close(); + assertEquals(expectedResult, result); + + // Remove existing partition directory + path = new Path(tableDesc.getUri().getPath(), "key=36.0"); + fs.delete(path, true); + + res = executeString("SELECT * FROM " + tableName + " ORDER BY key"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,17.0\n" + + "2,2,38.0\n" + + "3,2,45.0\n" + + "3,3,49.0\n"; + res.close(); + assertEquals(expectedResult, result); + + res = executeString("SELECT COUNT(*) AS cnt FROM " + tableName + " WHERE key > 30.0"); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "3\n"; + res.close(); + assertEquals(expectedResult, result); + + // Sort + String sortedTableName = "sortedPartitionTable"; + executeString("create table " + sortedTableName + " AS SELECT * FROM " + tableName + + " ORDER BY col1, col2 desc").close(); + + res = executeString("SELECT * FROM " + sortedTableName + " ORDER BY col1, col2 desc;"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,17.0\n" + + "2,2,38.0\n" + + "3,3,49.0\n" + + "3,2,45.0\n"; + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + sortedTableName + " PURGE").close(); + executeString("DROP TABLE " + externalTableName).close(); + executeString("DROP TABLE " + tableName + " PURGE").close(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 01d3c0f..445644b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -38,6 +38,7 @@ import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.*; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -173,17 +174,18 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); keyBuilder.setColumnName(split[0]); - keyBuilder.setPartitionValue(split[1]); + // Partition path have been escaped to avoid URISyntaxException. But partition value of partition keys table + // need to contain unescaped value for comparing filter conditions in select statement. + keyBuilder.setPartitionValue(StringUtils.unescapePathName(split[1])); builder.addPartitionKeys(keyBuilder.build()); } if (this.plan.getUri() == null) { - // In CTAS, the uri would be null. So, - String[] split = CatalogUtil.splitTableName(plan.getTableName()); - int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length(); + // In CTAS, the uri would be null. So, it get the uri from staging directory. + int endIndex = storeTablePath.toString().indexOf(FileTablespace.TMP_STAGING_DIR_PREFIX); String outputPath = storeTablePath.toString().substring(0, endIndex); - builder.setPath(outputPath + "/" + partition); + builder.setPath(outputPath + partition); } else { builder.setPath(this.plan.getUri().toString() + "/" + partition); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index fa36b79..ebd6e2b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -974,7 +974,7 @@ public class TajoMasterClientService extends AbstractService { tableName = request.getValue(); } - List<PartitionDescProto> partitions = catalog.getPartitions(databaseName, tableName); + List<PartitionDescProto> partitions = catalog.getPartitionsOfTable(databaseName, tableName); return PartitionListResponse.newBuilder() .setState(OK) .addAllPartition(partitions) http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index dac99e5..57d01aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -20,6 +20,7 @@ package org.apache.tajo.master.exec; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -508,8 +509,14 @@ public class DDLExecutor { throw new AmbiguousPartitionDirectoryExistException(assumedDirectory.toString()); } + long numBytes = 0L; + if (fs.exists(partitionPath)) { + ContentSummary summary = fs.getContentSummary(partitionPath); + numBytes = summary.getLength(); + } + catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), - alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION)); + alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION, numBytes)); // If the partition's path doesn't exist, this would make the directory by force. if (!fs.exists(partitionPath)) { @@ -595,15 +602,15 @@ public class DDLExecutor { PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns); // loop from one to the number of partition columns - Path [] filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0])); + Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); // Get all file status matched to a ith level path filter. for (int i = 1; i < partitionColumns.size(); i++) { - filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i])); + filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } // Find missing partitions from filesystem - List<PartitionDescProto> existingPartitions = catalog.getPartitions(databaseName, simpleTableName); + List<PartitionDescProto> existingPartitions = catalog.getPartitionsOfTable(databaseName, simpleTableName); List<String> existingPartitionNames = TUtil.newList(); Path existingPartitionPath = null; @@ -699,4 +706,13 @@ public class DDLExecutor { final TableDesc tableDesc = catalog.getTableDesc(tableName); return tableDesc.getSchema().containsByName(columnName); } + + private Path [] toPathArray(FileStatus[] fileStatuses) { + Path [] paths = new Path[fileStatuses.length]; + for (int i = 0; i < fileStatuses.length; i++) { + FileStatus fileStatus = fileStatuses[i]; + paths[i] = fileStatus.getPath(); + } + return paths; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 10c7ffe..12aba74 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.EventHandler; @@ -509,8 +510,11 @@ public class Query implements EventHandler<QueryEvent> { if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { List<PartitionDescProto> partitions = query.getPartitions(); if (partitions != null) { - String databaseName, simpleTableName; + // Set contents length and file count to PartitionDescProto by listing final output directories. + List<PartitionDescProto> finalPartitions = getPartitionsWithContentsSummary(query.systemConf, + finalOutputDir, partitions); + String databaseName, simpleTableName; if (CatalogUtil.isFQTableName(tableDesc.getName())) { String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); databaseName = split[0]; @@ -521,7 +525,7 @@ public class Query implements EventHandler<QueryEvent> { } // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, partitions, true); + catalog.addPartitions(databaseName, simpleTableName, finalPartitions, true); LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); } else { LOG.info("Can't find partitions for adding."); @@ -536,6 +540,21 @@ public class Query implements EventHandler<QueryEvent> { return QueryState.QUERY_SUCCEEDED; } + private List<PartitionDescProto> getPartitionsWithContentsSummary(TajoConf conf, Path outputDir, + List<PartitionDescProto> partitions) throws IOException { + List<PartitionDescProto> finalPartitions = TUtil.newList(); + + FileSystem fileSystem = outputDir.getFileSystem(conf); + for (PartitionDescProto partition : partitions) { + PartitionDescProto.Builder builder = partition.toBuilder(); + Path partitionPath = new Path(outputDir, partition.getPath()); + ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath); + builder.setNumBytes(contentSummary.getLength()); + finalPartitions.add(builder.build()); + } + return finalPartitions; + } + private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java index 734beb8..ef5cf08 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java @@ -18,15 +18,19 @@ package org.apache.tajo.plan.expr; +import com.google.common.base.Preconditions; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.util.ExprFinder; import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; +import org.apache.tajo.util.TUtil; import java.util.*; public class AlgebraicUtil { - + /** * Transpose a given comparison expression into the expression * where the variable corresponding to the target is placed @@ -493,4 +497,146 @@ public class AlgebraicUtil { return super.visitTimeLiteral(ctx, stack, expr); } } + + /** + * Find the top expr matched to type from the given expr + * + * @param expr start expr + * @param type to find + * @return a found expr + */ + public static <T extends Expr> T findTopExpr(Expr expr, OpType type) throws TajoException { + Preconditions.checkNotNull(expr); + Preconditions.checkNotNull(type); + + List<Expr> exprs = ExprFinder.findsInOrder(expr, type); + if (exprs.size() == 0) { + return null; + } else { + return (T) exprs.get(0); + } + } + + /** + * Find the most bottom expr matched to type from the given expr + * + * @param expr start expr + * @param type to find + * @return a found expr + */ + public static <T extends Expr> T findMostBottomExpr(Expr expr, OpType type) throws TajoException { + Preconditions.checkNotNull(expr); + Preconditions.checkNotNull(type); + + List<Expr> exprs = ExprFinder.findsInOrder(expr, type); + if (exprs.size() == 0) { + return null; + } else { + return (T) exprs.get(exprs.size()-1); + } + } + + /** + * Transforms an algebra expression to an array of conjunctive normal formed algebra expressions. + * + * @param expr The algebra expression to be transformed to an array of CNF-formed expressions. + * @return An array of CNF-formed algebra expressions + */ + public static Expr[] toConjunctiveNormalFormArray(Expr expr) { + List<Expr> list = TUtil.newList(); + toConjunctiveNormalFormArrayRecursive(expr, list); + return list.toArray(new Expr[list.size()]); + } + + private static void toConjunctiveNormalFormArrayRecursive(Expr node, List<Expr> found) { + if (node.getType() == OpType.And) { + toConjunctiveNormalFormArrayRecursive(((BinaryOperator) node).getLeft(), found); + toConjunctiveNormalFormArrayRecursive(((BinaryOperator) node).getRight(), found); + } else { + found.add(node); + } + } + + /** + * Build Exprs for all columns with a list of filter conditions. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Then, this methods will create three Exprs for (col1), (col2), (col3). + * + * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'. + * There is no filter condition corresponding to col2. + * Then, the path filter conditions are corresponding to the followings: + * + * The first Expr: col1 = 'A' + * The second Expr: col2 IS NOT NULL + * The third Expr: col3 = 'C' + * + * 'IS NOT NULL' predicate is always true against the partition path. + * + * + * @param partitionColumns + * @param conjunctiveForms + * @return + */ + public static Expr[] getRearrangedCNFExpressions(String tableName, + List<CatalogProtos.ColumnProto> partitionColumns, Expr[] conjunctiveForms) { + Expr[] filters = new Expr[partitionColumns.size()]; + Column target; + + for (int i = 0; i < partitionColumns.size(); i++) { + List<Expr> accumulatedFilters = TUtil.newList(); + target = new Column(partitionColumns.get(i)); + ColumnReferenceExpr columnReference = new ColumnReferenceExpr(tableName, target.getSimpleName()); + + if (conjunctiveForms == null) { + accumulatedFilters.add(new IsNullPredicate(true, columnReference)); + } else { + for (Expr expr : conjunctiveForms) { + Set<ColumnReferenceExpr> columnSet = ExprFinder.finds(expr, OpType.Column); + if (columnSet.contains(columnReference)) { + // Accumulate one qual per level + accumulatedFilters.add(expr); + } + } + + if (accumulatedFilters.size() == 0) { + accumulatedFilters.add(new IsNullPredicate(true, columnReference)); + } + } + + Expr filterPerLevel = AlgebraicUtil.createSingletonExprFromCNFByExpr( + accumulatedFilters.toArray(new Expr[accumulatedFilters.size()])); + filters[i] = filterPerLevel; + } + + return filters; + } + + /** + * Convert a list of conjunctive normal forms into a singleton expression. + * + * @param cnfExprs + * @return The EvalNode object that merges all CNF-formed expressions. + */ + public static Expr createSingletonExprFromCNFByExpr(Expr... cnfExprs) { + if (cnfExprs.length == 1) { + return cnfExprs[0]; + } + + return createSingletonExprFromCNFRecursiveByExpr(cnfExprs, 0); + } + + private static Expr createSingletonExprFromCNFRecursiveByExpr(Expr[] exprs, int idx) { + if (idx >= exprs.length) { + throw new ArrayIndexOutOfBoundsException("index " + idx + " is exceeded the maximum length ("+ + exprs.length+") of EvalNode"); + } + + if (idx == exprs.length - 2) { + return new BinaryOperator(OpType.And, exprs[idx], exprs[idx + 1]); + } else { + return new BinaryOperator(OpType.And, exprs[idx], createSingletonExprFromCNFRecursiveByExpr(exprs, idx + 1)); + } + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aacb91/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 5123fc4..5b1a1f1 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -24,19 +24,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; +import org.apache.tajo.plan.util.EvalNodeToExprConverter; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.storage.Tuple; @@ -44,11 +44,12 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.Stack; +import java.util.*; public class PartitionedTableRewriter implements LogicalPlanRewriteRule { + private CatalogService catalog; + private long totalVolume; + private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); private static final String NAME = "Partitioned Table Rewriter"; @@ -78,6 +79,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { LogicalPlan plan = context.getPlan(); LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); + this.catalog = context.getCatalog(); rewriter.visit(context.getQueryContext(), plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); return plan; } @@ -108,6 +110,13 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } } + private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) + throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedOperatorException, UnsupportedException { + return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + } + /** * It assumes that each conjunctive form corresponds to one column. * @@ -118,13 +127,82 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, Schema partitionColumns, EvalNode [] conjunctiveForms, - Path tablePath) - throws IOException { + private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) + throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedOperatorException, UnsupportedException { + Path [] filteredPaths = null; FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); + String [] splits = CatalogUtil.splitFQTableName(tableName); + List<PartitionDescProto> partitions = null; + + try { + if (conjunctiveForms == null) { + partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); + if (partitions.isEmpty()) { + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + } else { + filteredPaths = findFilteredPathsByPartitionDesc(partitions); + } + } else { + if (catalog.existPartitions(splits[0], splits[1])) { + PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); + partitions = catalog.getPartitionsByAlgebra(request); + filteredPaths = findFilteredPathsByPartitionDesc(partitions); + } else { + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + } + } + } catch (UnsupportedException ue) { + // Partial catalog might not allow some filter conditions. For example, HiveMetastore doesn't In statement, + // regexp statement and so on. Above case, Tajo need to build filtered path by listing hdfs directories. + LOG.warn(ue.getMessage()); + partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); + if (partitions.isEmpty()) { + filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + } else { + filteredPaths = findFilteredPathsByPartitionDesc(partitions); + } + scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); + } + LOG.info("Filtered directory or files: " + filteredPaths.length); + return filteredPaths; + } + + /** + * Build list of partition path by PartitionDescProto which is generated from CatalogStore. + * + * @param partitions + * @return + */ + private Path[] findFilteredPathsByPartitionDesc(List<PartitionDescProto> partitions) { + Path [] filteredPaths = new Path[partitions.size()]; + for (int i = 0; i < partitions.size(); i++) { + PartitionDescProto partition = partitions.get(i); + filteredPaths[i] = new Path(partition.getPath()); + totalVolume += partition.getNumBytes(); + } + return filteredPaths; + } + + /** + * Build list of partition path by filtering directories in the given table path. + * + * + * @param partitionColumns + * @param conjunctiveForms + * @param fs + * @param tablePath + * @return + * @throws IOException + */ + private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, + FileSystem fs, Path tablePath) throws IOException{ + Path [] filteredPaths = null; PathFilter [] filters; + if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); } else { @@ -132,18 +210,43 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } // loop from one to the number of partition columns - Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); + filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); for (int i = 1; i < partitionColumns.size(); i++) { // Get all file status matched to a ith level path filter. filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } - - LOG.info("Filtered directory or files: " + filteredPaths.length); return filteredPaths; } /** + * Build algebra expressions for querying partitions and partition keys by using EvalNodeToExprConverter. + * + * @param databaseName the database name + * @param tableName the table name + * @param conjunctiveForms EvalNode which contains filter conditions + * @return + */ + public static PartitionsByAlgebraProto getPartitionsAlgebraProto( + String databaseName, String tableName, EvalNode [] conjunctiveForms) { + + PartitionsByAlgebraProto.Builder request = PartitionsByAlgebraProto.newBuilder(); + request.setDatabaseName(databaseName); + request.setTableName(tableName); + + if (conjunctiveForms != null) { + EvalNode evalNode = AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms); + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(databaseName + "." + tableName); + convertor.visit(null, evalNode, new Stack<EvalNode>()); + request.setAlgebra(convertor.getResult().toJson()); + } else { + request.setAlgebra(""); + } + + return request.build(); + } + + /** * Build path filters for all levels with a list of filter conditions. * * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). @@ -191,6 +294,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()])); filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel); } + return filters; } @@ -214,15 +318,19 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { return filters; } - public static Path [] toPathArray(FileStatus[] fileStatuses) { + private Path [] toPathArray(FileStatus[] fileStatuses) { Path [] paths = new Path[fileStatuses.length]; - for (int j = 0; j < fileStatuses.length; j++) { - paths[j] = fileStatuses[j].getPath(); + for (int i = 0; i < fileStatuses.length; i++) { + FileStatus fileStatus = fileStatuses[i]; + paths[i] = fileStatus.getPath(); + totalVolume += fileStatus.getLen(); } return paths; } - private Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException { + public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedOperatorException, UnsupportedException { TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -261,10 +369,10 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, paritionValuesSchema, - indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri())); + return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, paritionValuesSchema, null, new Path(table.getUri())); + return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); } } @@ -313,25 +421,6 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } } - private void updateTableStat(OverridableConf queryContext, PartitionedTableScanNode scanNode) - throws TajoException { - if (scanNode.getInputPaths().length > 0) { - try { - FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(queryContext.getConf()); - long totalVolume = 0; - - for (Path input : scanNode.getInputPaths()) { - ContentSummary summary = fs.getContentSummary(input); - totalVolume += summary.getLength(); - totalVolume += summary.getFileCount(); - } - scanNode.getTableDesc().getStats().setNumBytes(totalVolume); - } catch (Throwable e) { - throw new TajoInternalError(e); - } - } - } - /** * Take a look at a column partition path. A partition path consists * of a table path part and column values part. This method transforms @@ -411,7 +500,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); rewrittenScanNode.init(scanNode, filteredPaths); - updateTableStat(queryContext, rewrittenScanNode); + rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); // if it is topmost node, set it as the rootnode of this block. if (stack.empty() || block.getRoot().equals(scanNode)) {
