Repository: tajo
Updated Branches:
  refs/heads/master cb6fe8076 -> 1a7c353c2


http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 39013df..f08a9f6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -40,6 +40,7 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec;
 import org.apache.tajo.engine.planner.physical.InsertRowsExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.DuplicateIndexException;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
@@ -492,19 +493,7 @@ public class QueryExecutor {
                                       SubmitQueryResponse.Builder 
responseBuilder) throws Exception {
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
-    TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, 
plan.getRootBlock().getRoot());
-    if (tableDesc != null) {
-
-      Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
-      FormatProperty formatProperty = 
space.getFormatProperty(tableDesc.getMeta());
-
-      if (!formatProperty.isInsertable()) {
-        throw new UnsupportedException(
-            String.format("INSERT operation on %s tablespace", 
tableDesc.getUri().toString()));
-      }
-
-      space.prepareTable(rootNode.getChild());
-    }
+    prepareForCreateTableOrInsert(catalog, plan);
 
     hookManager.doHooks(queryContext, plan);
 
@@ -524,6 +513,24 @@ public class QueryExecutor {
         " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + 
queryInfo.getQueryMasterPort());
   }
 
+  private void prepareForCreateTableOrInsert(CatalogService catalog, 
LogicalPlan plan)
+      throws TajoException, IOException {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, 
plan.getRootBlock().getRoot());
+    if (tableDesc != null) {
+
+      Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
+      FormatProperty formatProperty = 
space.getFormatProperty(tableDesc.getMeta());
+
+      if (!formatProperty.isInsertable()) {
+        throw new UnsupportedException (
+            String.format("INSERT operation on %s tablespace", 
tableDesc.getUri().toString()));
+      }
+
+      space.prepareTable(rootNode.getChild());
+    }
+  }
+
   private void checkIndexExistence(final QueryContext queryContext, final 
CreateIndexNode createIndexNode)
       throws DuplicateIndexException {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index c471aea..a029802 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -35,6 +35,8 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import 
org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
@@ -44,6 +46,7 @@ import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.HistoryReader;
@@ -344,6 +347,9 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       }
       builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
     }
+    if (queryMasterTask.isInitError()) {
+      
builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage());
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0f089d5..52e0a96 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -460,25 +460,27 @@ public class QueryMasterTask extends CompositeService {
   }
 
   private void cleanupQuery(final QueryId queryId) {
-    Set<InetSocketAddress> workers = Sets.newHashSet();
-    for (Stage stage : getQuery().getStages()) {
-      workers.addAll(stage.getAssignedWorkerMap().values());
-    }
+    if (getQuery() != null) {
+      Set<InetSocketAddress> workers = Sets.newHashSet();
+      for (Stage stage : getQuery().getStages()) {
+        workers.addAll(stage.getAssignedWorkerMap().values());
+      }
 
-    LOG.info("Cleanup resources of all workers. Query: " + queryId + ", 
workers: " + workers.size());
-    for (final InetSocketAddress worker : workers) {
-      queryMasterContext.getEventExecutor().submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            AsyncRpcClient rpc = 
RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, 
true);
-            TajoWorkerProtocol.TajoWorkerProtocolService 
tajoWorkerProtocolService = rpc.getStub();
-            tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), 
NullCallback.get());
-          } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+      LOG.info("Cleanup resources of all workers. Query: " + queryId + ", 
workers: " + workers.size());
+      for (final InetSocketAddress worker : workers) {
+        queryMasterContext.getEventExecutor().submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              AsyncRpcClient rpc = 
RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, 
true);
+              TajoWorkerProtocol.TajoWorkerProtocolService 
tajoWorkerProtocolService = rpc.getStub();
+              tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), 
NullCallback.get());
+            } catch (Throwable e) {
+              LOG.error(e.getMessage(), e);
+            }
           }
-        }
-      });
+        });
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 57bedd2..6f92344 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -72,7 +72,7 @@ public class ExecutionBlockContext {
   private TajoWorker.WorkerContext workerContext;
   private String plan;
 
-  private ExecutionBlockSharedResource resource;
+  private final ExecutionBlockSharedResource resource;
 
   private TajoQueryEngine queryEngine;
   private RpcClientManager connManager;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 5996118..a2f8c06 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -373,17 +373,6 @@ public class TaskAttemptContext {
     return fragmentMap.get(id).toArray(new 
FragmentProto[fragmentMap.get(id).size()]);
   }
 
-  public String getUniqueKeyFromFragments() {
-    StringBuilder sb = new StringBuilder();
-    for (List<FragmentProto> fragments : fragmentMap.values()) {
-      for (FragmentProto f : fragments) {
-        FileFragment fileFragment = 
FragmentConvertor.convert(FileFragment.class, f);
-        
sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
-      }
-    }
-    return sb.toString();
-  }
-
   public int hashCode() {
     return Objects.hashCode(taskId);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto 
b/tajo-core/src/main/proto/ResourceProtos.proto
index e789b81..a24c840 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -208,7 +208,7 @@ message NodeHeartbeatResponse {
   repeated QueryIdProto queryId = 3;
 }
 
-//deplecated
+// deprecated
 message TajoHeartbeatRequest {
   required WorkerConnectionInfoProto connectionInfo = 1;
   optional QueryIdProto queryId = 2;
@@ -218,7 +218,7 @@ message TajoHeartbeatRequest {
   optional float queryProgress = 6;
 }
 
-//deplecated
+// deprecated
 message TajoHeartbeatResponse {
   message ResponseCommand {
       required string command = 1;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 4b17b0e..f70731f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1267,6 +1267,7 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
     JoinNode join = plan.createNode(JoinNode.class);
     join.init(JoinType.CROSS, left, right);
     join.setInSchema(merged);
+    block.addJoinType(join.getJoinType());
 
     EvalNode evalNode;
     List<String> newlyEvaluatedExprs = TUtil.newList();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
index 3348097..aedf31e 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
@@ -364,23 +364,23 @@ public class GreedyHeuristicJoinOrderAlgorithm implements 
JoinOrderAlgorithm {
         // TODO - improve cost estimation
         // for outer joins, filter factor does not matter
         case LEFT_OUTER:
-          factor *= 
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
-              
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema());
+          factor *= 
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
+              
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema());
           break;
         case RIGHT_OUTER:
-          factor *= 
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
-              
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema());
+          factor *= 
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
+              
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema());
           break;
         case FULL_OUTER:
-          factor *= 
Math.max(SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
-              
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()),
-                  
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
-                  
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()));
+          factor *= 
Math.max((float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
+                  
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema()),
+              
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
+                  
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()));
           break;
         case LEFT_ANTI:
         case LEFT_SEMI:
           factor *= DEFAULT_SELECTION_FACTOR * 
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
-              
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema());
+              
(float)SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema());
           break;
         case INNER:
         default:
@@ -388,7 +388,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements 
JoinOrderAlgorithm {
           // filter factor * output tuple width / input tuple width
           factor *= Math.pow(DEFAULT_SELECTION_FACTOR, 
joinEdge.getJoinQual().size())
               * SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema())
-              / 
(SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema())
+              / 
(float)(SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema())
               + 
SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()));
           break;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
index 419b3e5..83bb700 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -39,7 +39,6 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import 
org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
 import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
-import org.apache.tajo.plan.util.IndexUtil;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.TUtil;
@@ -965,7 +964,7 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
       databaseName = CatalogUtil.extractQualifier(table.getName());
       tableName = CatalogUtil.extractSimpleName(table.getName());
       Set<Predicate> predicates = TUtil.newHashSet();
-      for (EvalNode eval : IndexUtil.getAllEqualEvals(qual)) {
+      for (EvalNode eval : PlannerUtil.getAllEqualEvals(qual)) {
         BinaryEval binaryEval = (BinaryEval) eval;
         // TODO: consider more complex predicates
         if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
deleted file mode 100644
index 8f847e0..0000000
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.plan.util;
-
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.plan.expr.*;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Stack;
-
-public class IndexUtil {
-  
-  public static String getIndexName(String indexName , SortSpec[] keys) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(indexName + "_");
-    for(int i = 0 ; i < keys.length ; i ++) {
-      builder.append(keys[i].getSortKey().getSimpleName() + "_");
-    }
-    return builder.toString();
-  }
-
-  public static List<EvalNode> getAllEqualEvals(EvalNode qual) {
-    EvalTreeUtil.EvalFinder finder = new 
EvalTreeUtil.EvalFinder(EvalType.EQUAL);
-    finder.visit(null, qual, new Stack<EvalNode>());
-    return finder.getEvalNodes();
-  }
-  
-  private static class FieldAndValueFinder implements EvalNodeVisitor {
-    private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
-    
-    public LinkedList<BinaryEval> getNodeList () {
-      return this.nodeList;
-    }
-    
-    @Override
-    public void visit(EvalNode node) {
-      BinaryEval binaryEval = (BinaryEval) node;
-      switch(node.getType()) {
-      case AND:
-        break;
-      case EQUAL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
-          nodeList.add(binaryEval);
-        }
-        break;
-      case IS_NULL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST) {
-          nodeList.add(binaryEval);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index a9dca4c..d9fb218 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -992,4 +992,10 @@ public class PlannerUtil {
     }
     return inSubqueries;
   }
+
+  public static List<EvalNode> getAllEqualEvals(EvalNode qual) {
+    EvalTreeUtil.EvalFinder finder = new 
EvalTreeUtil.EvalFinder(EvalType.EQUAL);
+    finder.visit(null, qual, new Stack<EvalNode>());
+    return finder.getEvalNodes();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
new file mode 100644
index 0000000..a46d66e
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.exception.InvalidInputsForCrossJoin;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TooLargeInputForCrossJoinException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+import java.util.Stack;
+
+/**
+ *
+ * PostLogicalPlanVerifier verifies the logical plan with some physical 
information.
+ */
+public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context, 
Object> {
+
+  static class Context {
+    long bcastLimitForCrossJoin;
+    VerificationState state;
+
+    public Context(VerificationState state, long bcastLimitForCrossJoin) {
+      this.state = state;
+      this.bcastLimitForCrossJoin = bcastLimitForCrossJoin;
+    }
+  }
+
+  public VerificationState verify(long broadcastThresholdForCrossJoin, 
VerificationState state, LogicalPlan plan)
+      throws TajoException {
+    Context context = new Context(state, broadcastThresholdForCrossJoin);
+    visit(context, plan, plan.getRootBlock());
+    return context.state;
+  }
+
+  @Override
+  public Object visitJoin(Context context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, JoinNode node,
+                          Stack<LogicalNode> stack) throws TajoException {
+    super.visitJoin(context, plan, block, node, stack);
+
+    if (node.getJoinType() == JoinType.CROSS) {
+
+      
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+      //
+      // Cross join is one of the most heavy operations. To avoid the trouble 
caused by exhausting resources to perform
+      // cross join, we allow it only when it does not burden cluster too much.
+      //
+      // Currently, we simply allow the cross join only when it has at least 
one of inputs is a broadcastable relation.
+      // However, we can lose a lot of possible opportunities because this 
rule is too simple.
+      // This rule must be improved as follows.
+      //
+      // If the join type is cross, the following two restrictions are checked.
+      // 1) The expected result size does not exceed the predefined threshold.
+      // 2) Cross join must be executed with broadcast join.
+      //
+      // For the second restriction, the following two conditions must be 
satisfied.
+      // 1) There is at most a single relation which size is greater than the 
broadcast join threshold for non-cross
+      // join.
+      // 2) At least one of the cross join's inputs must not exceed the 
broadcast join threshold for cross join.
+      //
+      
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+      if (!isSimpleRelationNode(node.getLeftChild()) && 
!isSimpleRelationNode(node.getRightChild())) {
+        context.state.addVerification(new InvalidInputsForCrossJoin());
+      } else {
+
+        boolean crossJoinAllowed = false;
+        List<String> largeRelationNames = TUtil.newList();
+
+        if (isSimpleRelationNode(node.getLeftChild())) {
+          if (getTableVolume((ScanNode) node.getLeftChild()) <= 
context.bcastLimitForCrossJoin) {
+            crossJoinAllowed = true;
+          } else {
+            largeRelationNames.add(((ScanNode) 
node.getLeftChild()).getCanonicalName());
+          }
+        }
+
+        if (isSimpleRelationNode(node.getRightChild())) {
+          if (getTableVolume((ScanNode) node.getRightChild()) <= 
context.bcastLimitForCrossJoin) {
+            crossJoinAllowed = true;
+          } else {
+            largeRelationNames.add(((ScanNode) 
node.getRightChild()).getCanonicalName());
+          }
+
+          if (!crossJoinAllowed) {
+            context.state.addVerification(new 
TooLargeInputForCrossJoinException(
+                largeRelationNames.toArray(new 
String[largeRelationNames.size()]),
+                context.bcastLimitForCrossJoin));
+          }
+        }
+      }
+
+    }
+    return null;
+  }
+
+  private static boolean isSimpleRelationNode(LogicalNode node) {
+    if (node instanceof ScanNode) {
+      // PartitionedTableScanNode and IndexScanNode extends ScanNode.
+      // TableSubqueryNode is not the simple relation node.
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get a volume of a table of a partitioned table
+   * @param scanNode ScanNode corresponding to a table
+   * @return table volume (bytes)
+   */
+  private static long getTableVolume(ScanNode scanNode) {
+    if (scanNode.getTableDesc().hasStats()) {
+      long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
+      if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) 
scanNode;
+        if (pScanNode.getInputPaths() == null || 
pScanNode.getInputPaths().length == 0) {
+          scanBytes = 0L;
+        }
+      }
+
+      return scanBytes;
+    } else {
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto 
b/tajo-plan/src/main/proto/Plan.proto
index da1e187..0bfac0d 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -608,8 +608,6 @@ message OutputDistinctEnforce {
 
 message JoinEnforce {
   enum JoinAlgorithm {
-    NESTED_LOOP_JOIN = 0;
-    BLOCK_NESTED_LOOP_JOIN = 1;
     IN_MEMORY_HASH_JOIN = 2;
     HYBRID_HASH_JOIN = 3;
     MERGE_JOIN = 4;

Reply via email to