Repository: tajo Updated Branches: refs/heads/branch-0.11.0 03d2dd2d8 -> 13f42cf40
http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 39013df..f08a9f6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -40,6 +40,7 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec; import org.apache.tajo.engine.planner.physical.InsertRowsExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.DuplicateIndexException; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; @@ -492,19 +493,7 @@ public class QueryExecutor { SubmitQueryResponse.Builder responseBuilder) throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot()); - if (tableDesc != null) { - - Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); - FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta()); - - if (!formatProperty.isInsertable()) { - throw new UnsupportedException( - String.format("INSERT operation on %s tablespace", tableDesc.getUri().toString())); - } - - space.prepareTable(rootNode.getChild()); - } + prepareForCreateTableOrInsert(catalog, plan); hookManager.doHooks(queryContext, plan); @@ -524,6 +513,24 @@ public class QueryExecutor { " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); } + private void prepareForCreateTableOrInsert(CatalogService catalog, LogicalPlan plan) + throws TajoException, IOException { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot()); + if (tableDesc != null) { + + Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); + FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta()); + + if (!formatProperty.isInsertable()) { + throw new UnsupportedException ( + String.format("INSERT operation on %s tablespace", tableDesc.getUri().toString())); + } + + space.prepareTable(rootNode.getChild()); + } + } + private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode) throws DuplicateIndexException { http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index c471aea..a029802 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -35,6 +35,8 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; @@ -44,6 +46,7 @@ import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.HistoryReader; @@ -344,6 +347,9 @@ public class QueryMaster extends CompositeService implements EventHandler { } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); } + if (queryMasterTask.isInitError()) { + builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage()); + } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 0f089d5..52e0a96 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -460,25 +460,27 @@ public class QueryMasterTask extends CompositeService { } private void cleanupQuery(final QueryId queryId) { - Set<InetSocketAddress> workers = Sets.newHashSet(); - for (Stage stage : getQuery().getStages()) { - workers.addAll(stage.getAssignedWorkerMap().values()); - } + if (getQuery() != null) { + Set<InetSocketAddress> workers = Sets.newHashSet(); + for (Stage stage : getQuery().getStages()) { + workers.addAll(stage.getAssignedWorkerMap().values()); + } - LOG.info("Cleanup resources of all workers. Query: " + queryId + ", workers: " + workers.size()); - for (final InetSocketAddress worker : workers) { - queryMasterContext.getEventExecutor().submit(new Runnable() { - @Override - public void run() { - try { - AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); + LOG.info("Cleanup resources of all workers. Query: " + queryId + ", workers: " + workers.size()); + for (final InetSocketAddress worker : workers) { + queryMasterContext.getEventExecutor().submit(new Runnable() { + @Override + public void run() { + try { + AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } } - } - }); + }); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 57bedd2..6f92344 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -72,7 +72,7 @@ public class ExecutionBlockContext { private TajoWorker.WorkerContext workerContext; private String plan; - private ExecutionBlockSharedResource resource; + private final ExecutionBlockSharedResource resource; private TajoQueryEngine queryEngine; private RpcClientManager connManager; http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 5996118..a2f8c06 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -373,17 +373,6 @@ public class TaskAttemptContext { return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]); } - public String getUniqueKeyFromFragments() { - StringBuilder sb = new StringBuilder(); - for (List<FragmentProto> fragments : fragmentMap.values()) { - for (FragmentProto f : fragments) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f); - sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); - } - } - return sb.toString(); - } - public int hashCode() { return Objects.hashCode(taskId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/proto/ResourceProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index e789b81..a24c840 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -208,7 +208,7 @@ message NodeHeartbeatResponse { repeated QueryIdProto queryId = 3; } -//deplecated +// deprecated message TajoHeartbeatRequest { required WorkerConnectionInfoProto connectionInfo = 1; optional QueryIdProto queryId = 2; @@ -218,7 +218,7 @@ message TajoHeartbeatRequest { optional float queryProgress = 6; } -//deplecated +// deprecated message TajoHeartbeatResponse { message ResponseCommand { required string command = 1; http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 4b17b0e..f70731f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -1267,6 +1267,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex JoinNode join = plan.createNode(JoinNode.class); join.init(JoinType.CROSS, left, right); join.setInSchema(merged); + block.addJoinType(join.getJoinType()); EvalNode evalNode; List<String> newlyEvaluatedExprs = TUtil.newList(); http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java index 3348097..aedf31e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java @@ -364,23 +364,23 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm { // TODO - improve cost estimation // for outer joins, filter factor does not matter case LEFT_OUTER: - factor *= SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()); + factor *= (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()); break; case RIGHT_OUTER: - factor *= SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()); + factor *= (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()); break; case FULL_OUTER: - factor *= Math.max(SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()), - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema())); + factor *= Math.max((float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()), + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema())); break; case LEFT_ANTI: case LEFT_SEMI: factor *= DEFAULT_SELECTION_FACTOR * SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) / - SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()); + (float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()); break; case INNER: default: @@ -388,7 +388,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm { // filter factor * output tuple width / input tuple width factor *= Math.pow(DEFAULT_SELECTION_FACTOR, joinEdge.getJoinQual().size()) * SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) - / (SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()) + / (float)(SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()) + SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema())); break; } http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java index 419b3e5..83bb700 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java @@ -39,7 +39,6 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext; import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; -import org.apache.tajo.plan.util.IndexUtil; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.TUtil; @@ -965,7 +964,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo databaseName = CatalogUtil.extractQualifier(table.getName()); tableName = CatalogUtil.extractSimpleName(table.getName()); Set<Predicate> predicates = TUtil.newHashSet(); - for (EvalNode eval : IndexUtil.getAllEqualEvals(qual)) { + for (EvalNode eval : PlannerUtil.getAllEqualEvals(qual)) { BinaryEval binaryEval = (BinaryEval) eval; // TODO: consider more complex predicates if (binaryEval.getLeftExpr().getType() == EvalType.FIELD && http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java deleted file mode 100644 index 8f847e0..0000000 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.plan.util; - -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.plan.expr.*; - -import java.util.LinkedList; -import java.util.List; -import java.util.Stack; - -public class IndexUtil { - - public static String getIndexName(String indexName , SortSpec[] keys) { - StringBuilder builder = new StringBuilder(); - builder.append(indexName + "_"); - for(int i = 0 ; i < keys.length ; i ++) { - builder.append(keys[i].getSortKey().getSimpleName() + "_"); - } - return builder.toString(); - } - - public static List<EvalNode> getAllEqualEvals(EvalNode qual) { - EvalTreeUtil.EvalFinder finder = new EvalTreeUtil.EvalFinder(EvalType.EQUAL); - finder.visit(null, qual, new Stack<EvalNode>()); - return finder.getEvalNodes(); - } - - private static class FieldAndValueFinder implements EvalNodeVisitor { - private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>(); - - public LinkedList<BinaryEval> getNodeList () { - return this.nodeList; - } - - @Override - public void visit(EvalNode node) { - BinaryEval binaryEval = (BinaryEval) node; - switch(node.getType()) { - case AND: - break; - case EQUAL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST ) { - nodeList.add(binaryEval); - } - break; - case IS_NULL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST) { - nodeList.add(binaryEval); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index a9dca4c..d9fb218 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -992,4 +992,10 @@ public class PlannerUtil { } return inSubqueries; } + + public static List<EvalNode> getAllEqualEvals(EvalNode qual) { + EvalTreeUtil.EvalFinder finder = new EvalTreeUtil.EvalFinder(EvalType.EQUAL); + finder.visit(null, qual, new Stack<EvalNode>()); + return finder.getEvalNodes(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java new file mode 100644 index 0000000..a46d66e --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.verifier; + +import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.exception.InvalidInputsForCrossJoin; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TooLargeInputForCrossJoinException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.util.TUtil; + +import java.util.List; +import java.util.Stack; + +/** + * + * PostLogicalPlanVerifier verifies the logical plan with some physical information. + */ +public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context, Object> { + + static class Context { + long bcastLimitForCrossJoin; + VerificationState state; + + public Context(VerificationState state, long bcastLimitForCrossJoin) { + this.state = state; + this.bcastLimitForCrossJoin = bcastLimitForCrossJoin; + } + } + + public VerificationState verify(long broadcastThresholdForCrossJoin, VerificationState state, LogicalPlan plan) + throws TajoException { + Context context = new Context(state, broadcastThresholdForCrossJoin); + visit(context, plan, plan.getRootBlock()); + return context.state; + } + + @Override + public Object visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node, + Stack<LogicalNode> stack) throws TajoException { + super.visitJoin(context, plan, block, node, stack); + + if (node.getJoinType() == JoinType.CROSS) { + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Cross join is one of the most heavy operations. To avoid the trouble caused by exhausting resources to perform + // cross join, we allow it only when it does not burden cluster too much. + // + // Currently, we simply allow the cross join only when it has at least one of inputs is a broadcastable relation. + // However, we can lose a lot of possible opportunities because this rule is too simple. + // This rule must be improved as follows. + // + // If the join type is cross, the following two restrictions are checked. + // 1) The expected result size does not exceed the predefined threshold. + // 2) Cross join must be executed with broadcast join. + // + // For the second restriction, the following two conditions must be satisfied. + // 1) There is at most a single relation which size is greater than the broadcast join threshold for non-cross + // join. + // 2) At least one of the cross join's inputs must not exceed the broadcast join threshold for cross join. + // + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + if (!isSimpleRelationNode(node.getLeftChild()) && !isSimpleRelationNode(node.getRightChild())) { + context.state.addVerification(new InvalidInputsForCrossJoin()); + } else { + + boolean crossJoinAllowed = false; + List<String> largeRelationNames = TUtil.newList(); + + if (isSimpleRelationNode(node.getLeftChild())) { + if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin) { + crossJoinAllowed = true; + } else { + largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName()); + } + } + + if (isSimpleRelationNode(node.getRightChild())) { + if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin) { + crossJoinAllowed = true; + } else { + largeRelationNames.add(((ScanNode) node.getRightChild()).getCanonicalName()); + } + + if (!crossJoinAllowed) { + context.state.addVerification(new TooLargeInputForCrossJoinException( + largeRelationNames.toArray(new String[largeRelationNames.size()]), + context.bcastLimitForCrossJoin)); + } + } + } + + } + return null; + } + + private static boolean isSimpleRelationNode(LogicalNode node) { + if (node instanceof ScanNode) { + // PartitionedTableScanNode and IndexScanNode extends ScanNode. + // TableSubqueryNode is not the simple relation node. + return true; + } else { + return false; + } + } + + /** + * Get a volume of a table of a partitioned table + * @param scanNode ScanNode corresponding to a table + * @return table volume (bytes) + */ + private static long getTableVolume(ScanNode scanNode) { + if (scanNode.getTableDesc().hasStats()) { + long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); + if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { + PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; + if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { + scanBytes = 0L; + } + } + + return scanBytes; + } else { + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index da1e187..0bfac0d 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -608,8 +608,6 @@ message OutputDistinctEnforce { message JoinEnforce { enum JoinAlgorithm { - NESTED_LOOP_JOIN = 0; - BLOCK_NESTED_LOOP_JOIN = 1; IN_MEMORY_HASH_JOIN = 2; HYBRID_HASH_JOIN = 3; MERGE_JOIN = 4;
