TAJO-1403: Improve 'Simple Query' with only partition columns and constant values.
Closes #434 Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8d0146b8 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8d0146b8 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8d0146b8 Branch: refs/heads/index_support Commit: 8d0146b8d8f5eeac37fe3f531ce1362af3b20c2f Parents: 3aaff38 Author: Jihoon Son <[email protected]> Authored: Mon Mar 23 11:48:18 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Mon Mar 23 11:48:18 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../exec/NonForwardQueryResultFileScanner.java | 37 +++++++++++- .../apache/tajo/master/exec/QueryExecutor.java | 5 ++ .../apache/tajo/master/TestGlobalPlanner.java | 3 +- .../org/apache/tajo/plan/expr/EvalTreeUtil.java | 61 ++++++++++++++++++++ .../org/apache/tajo/plan/util/PlannerUtil.java | 26 ++++++++- .../apache/tajo/storage/FileStorageManager.java | 27 ++++++++- 7 files changed, 155 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e3636e6..ad3a6bd 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1403: Improve 'Simple Query' with only partition columns and constant + values. (Contributed by Dongjoon Hyun, Committed by jihoon) + TAJO-1418: Comment on TAJO_PULLSERVER_STANDALONE in tajo-env.sh is not consistent. (Contributed by navis, Committed by hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index dc0c44a..6c02aa9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -24,19 +24,23 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.planner.physical.SeqScanExec; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -74,10 +78,37 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc initSeqScanExec(); } + /** + * Set partition path and depth if ScanNode's qualification exists + * + * @param storageManager target storage manager to be set with partition info + */ + private void setPartition(StorageManager storageManager) { + if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && + storageManager instanceof FileStorageManager) { + StringBuffer path = new StringBuffer(); + int depth = 0; + if (tableDesc.hasPartition()) { + for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + String partitionValue = EvalTreeUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName()); + if (partitionValue == null) + break; + path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue))); + depth++; + } + } + ((FileStorageManager)storageManager).setPartitionPath(path.toString()); + ((FileStorageManager)storageManager).setCurrentDepth(depth); + scanNode.setQual(null); + } + } + private void initSeqScanExec() throws IOException { - List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) - .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - + StorageManager storageManager = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()); + List<Fragment> fragments = null; + setPartition(storageManager); + fragments = storageManager.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + if (fragments != null && !fragments.isEmpty()) { FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); this.taskContext = new TaskAttemptContext( http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index db82fca..aa8b228 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -212,6 +212,11 @@ public class QueryExecutor { scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); } TableDesc desc = scanNode.getTableDesc(); + // Keep info for partition-column-only queries + SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION); + if (desc.isExternal() && desc.hasPartition() && selectionNode != null) { + scanNode.setQual(selectionNode.getQual()); + } int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index d0f7cf4..45c94a3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -317,8 +317,9 @@ public class TestGlobalPlanner { plan = buildPlan("select * from customer where c_nationkey = 1"); assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + // c_nationkey is partition column plan = buildPlan("select * from customer_parts where c_nationkey = 1"); - assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); // same column order plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" + http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java index 23b4659..0d0ea88 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java @@ -573,4 +573,65 @@ public class EvalTreeUtil { public static Datum evaluateImmediately(EvalNode evalNode) { return evalNode.eval(null, null); } + + /** + * Checks whether EvalNode consists of only partition columns and const values. + * The partition based simple query can be defined as 'select * from tb_name where col_name1="X" and col_name2="Y" [LIMIT Z]', + * whose WHERE clause consists of only partition-columns with constant values. + * Partition columns must be able to form a prefix of HDFS path like '/tb_name1/col_name1=X/col_name2=Y'. + * + * @param node The qualification node of a SELECTION node + * @param partSchema Partition expression schema + * @return True if the query is partition-column based simple query. + */ + public static boolean checkIfPartitionSelection(EvalNode node, Schema partSchema) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && partSchema.contains(((FieldEval) left).getColumnName())) { + return true; + } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) { + return true; + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema); + } + } + return false; + } + + /** + * Get partition constant value associated with `columnName`. + * + * @param node EvalNode having query predicates + * @param columnName Column name to be looked up + * @return String The value associated with `columnName` in the predicates + */ + public static String getPartitionValue(EvalNode node, String columnName) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) { + return ((ConstEval)right).getValue().toString(); + } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) { + return ((ConstEval)left).getValue().toString(); + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + String value = getPartitionValue(left, columnName); + if (value == null) { + value = getPartitionValue(right, columnName); + } + return value; + } + } + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 0fbd359..b09fc9e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -100,6 +100,7 @@ public class PlannerUtil { PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; + boolean prefixPartitionWhere = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); if (scanNode == null) { @@ -133,11 +134,34 @@ public class PlannerUtil { } } } + + /** + * TODO: Remove isExternal check after resolving the following issues + * - TAJO-1416: INSERT INTO EXTERNAL PARTITIONED TABLE + * - TAJO-1441: INSERT INTO MANAGED PARTITIONED TABLE + */ + if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) { + EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual(); + Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema(); + if (EvalTreeUtil.checkIfPartitionSelection(node, partSchema)) { + prefixPartitionWhere = true; + boolean isPrefix = true; + for (Column c : partSchema.getColumns()) { + String value = EvalTreeUtil.getPartitionValue(node, c.getSimpleName()); + if (isPrefix && value == null) + isPrefix = false; + else if (!isPrefix && value != null) { + prefixPartitionWhere = false; + break; + } + } + } + } } return !checkIfDDLPlan(rootNode) && (simpleOperator && noComplexComputation && isOneQueryBlock && - noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); + noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index c427940..8d425b4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -149,6 +149,21 @@ public class FileStorageManager extends StorageManager { return new Path(tableBaseDir, tableName); } + private String partitionPath = ""; + private int currentDepth = 0; + + /** + * Set a specific partition path for partition-column only queries + * @param path The partition prefix path + */ + public void setPartitionPath(String path) { partitionPath = path; } + + /** + * Set a depth of partition path for partition-column only queries + * @param depth Depth of partitions + */ + public void setCurrentDepth(int depth) { currentDepth = depth; } + @VisibleForTesting public Appender getAppender(TableMeta meta, Schema schema, Path filePath) throws IOException { @@ -722,8 +737,16 @@ public class FileStorageManager extends StorageManager { List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); if (fs.exists(tablePath)) { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + if (!partitionPath.isEmpty()) { + Path partPath = new Path(tableDesc.getPath() + partitionPath); + if (fs.exists(partPath)) { + getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); + } + } else { + getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + } } List<Fragment> fragments = new ArrayList<Fragment>();
