http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 4d1cee1..0c3db6d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -19,10 +19,13 @@ package org.apache.tajo.util; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; +import org.apache.tajo.OverridableConf; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.CommonGsonHelper; import org.apache.tajo.json.GsonObject; +import sun.misc.FloatingDecimal; import java.util.HashMap; import java.util.Map; @@ -32,6 +35,9 @@ import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, GsonObject { + public static final String TRUE_STR = "true"; + public static final String FALSE_STR = "false"; + private KeyValueSetProto.Builder builder = KeyValueSetProto.newBuilder(); @Expose private Map<String,String> keyVals; @@ -63,40 +69,133 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs public int size() { return keyVals.size(); } - - public void put(String key, String val) { - this.keyVals.put(key, val); - } public void putAll(Map<String, String> keyValues) { if (keyValues != null) { this.keyVals.putAll(keyValues); } } - - public void putAll(KeyValueSet keyValueSet) { + + public void putAll(KeyValueSet keyValueSet) { if (keyValueSet != null) { - this.keyVals.putAll(keyValueSet.keyVals); + this.keyVals.putAll(keyValueSet.keyVals); } - } - - public String get(String key) { - return this.keyVals.get(key); - } - - public String get(String key, String defaultVal) { - if(keyVals.containsKey(key)) - return keyVals.get(key); - else { - return defaultVal; - } - } - - public Map<String,String> getAllKeyValus() { - return keyVals; - } + } + + public Map<String,String> getAllKeyValus() { + return keyVals; + } + + public boolean containsKey(String key) { + return this.keyVals.containsKey(key); + } + + public void set(String key, String val) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(val); + + this.keyVals.put(key, val); + } + + public String get(String key, String defaultVal) { + if(keyVals.containsKey(key)) { + return keyVals.get(key); + } else if (defaultVal != null) { + return defaultVal; + } else { + throw new IllegalArgumentException("No such a config key: " + key); + } + } + + public String get(String key) { + return get(key, null); + } + + public void setBool(String key, boolean val) { + set(key, val ? TRUE_STR : FALSE_STR); + } + + public boolean getBool(String key, Boolean defaultVal) { + if (containsKey(key)) { + String strVal = get(key, null); + return strVal != null ? strVal.equalsIgnoreCase(TRUE_STR) : false; + } else if (defaultVal != null) { + return defaultVal; + } else { + return false; + } + } + + public boolean getBool(String key) { + return getBool(key, null); + } + + public void setInt(String key, int val) { + set(key, String.valueOf(val)); + } + + public int getInt(String key, Integer defaultVal) { + if (containsKey(key)) { + String strVal = get(key, null); + return Integer.parseInt(strVal); + } else if (defaultVal != null) { + return defaultVal; + } else { + throw new IllegalArgumentException("No such a config key: " + key); + } + } + + public int getInt(String key) { + return getInt(key, null); + } + + public void setLong(String key, long val) { + set(key, String.valueOf(val)); + } + + public long getLong(String key, Long defaultVal) { + if (containsKey(key)) { + String strVal = get(key, null); + return Long.parseLong(strVal); + } else if (defaultVal != null) { + return defaultVal; + } else { + throw new IllegalArgumentException("No such a config key: " + key); + } + } + + public long getLong(String key) { + return getLong(key, null); + } + + public void setFloat(String key, float val) { + set(key, String.valueOf(val)); + } + + public float getFloat(String key, Float defaultVal) { + if (containsKey(key)) { + String strVal = get(key, null); + try { + sun.misc.FloatingDecimal fd = FloatingDecimal.readJavaFormatString(strVal); + if (Float.MAX_VALUE < fd.doubleValue()) { + throw new IllegalStateException("Parsed value is overflow in float type"); + } + return fd.floatValue(); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("No such a config key: " + key); + } + } else if (defaultVal != null) { + return defaultVal.floatValue(); + } else { + throw new IllegalArgumentException("No such a config key: " + key); + } + } + + public float getFloat(String key) { + return getFloat(key, null); + } - public String delete(String key) { + public String remove(String key) { return keyVals.remove(key); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java index adf80b0..42623bd 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java @@ -52,9 +52,9 @@ public class TestArithmeticOperator { public void setUp() { TajoConf tajoConf = new TajoConf(); if ("Zero_Exception".equals(option)) { - tajoConf.setBoolVar(ConfVars.BEHAVIOR_ARITHMETIC_ABORT, true); + tajoConf.setBoolVar(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, true); } else { - tajoConf.setBoolVar(ConfVars.BEHAVIOR_ARITHMETIC_ABORT, false); + tajoConf.setBoolVar(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, false); } Datum.initAbortWhenDivideByZero(tajoConf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java index 3ffeeb0..a43cc1a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -35,7 +36,7 @@ import org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgor import org.apache.tajo.engine.planner.logical.join.JoinGraph; import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm; import org.apache.tajo.engine.planner.rewrite.*; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.engine.query.QueryContext; import java.util.LinkedHashSet; import java.util.Set; @@ -57,7 +58,7 @@ public class LogicalOptimizer { public LogicalOptimizer(TajoConf systemConf) { rulesBeforeJoinOpt = new BasicQueryRewriteEngine(); - if (systemConf.getBoolVar(ConfVars.PLANNER_USE_FILTER_PUSHDOWN)) { + if (systemConf.getBoolVar(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) { rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule()); } @@ -84,13 +85,13 @@ public class LogicalOptimizer { return optimize(null, plan); } - public LogicalNode optimize(Session session, LogicalPlan plan) throws PlanningException { + public LogicalNode optimize(QueryContext context, LogicalPlan plan) throws PlanningException { rulesBeforeJoinOpt.rewrite(plan); DirectedGraphCursor<String, BlockEdge> blockCursor = new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName()); - if (session == null || "true".equals(session.getVariable(ConfVars.OPTIMIZER_JOIN_ENABLE.varname, "true"))) { + if (context == null || context.getBool(SessionVars.TEST_JOIN_OPT_ENABLED)) { // default is true while (blockCursor.hasNext()) { optimizeJoinOrder(plan, blockCursor.nextBlock()); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java index 86bacef..ee65b2b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java @@ -67,10 +67,8 @@ public class LogicalPlan { LogicalPlanner planner; private boolean isExplain; - private final String currentDatabase; - public LogicalPlan(String currentDatabase, LogicalPlanner planner) { - this.currentDatabase = currentDatabase; + public LogicalPlan(LogicalPlanner planner) { this.planner = planner; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java index 84fe6c2..6ee0ff8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java @@ -26,10 +26,10 @@ import org.apache.tajo.engine.eval.FieldEval; import org.apache.tajo.engine.exception.NoSuchColumnException; import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock; import org.apache.tajo.engine.planner.logical.*; -import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode; import org.apache.tajo.engine.planner.nameresolver.NameResolver; +import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.utils.SchemaUtil; -import org.apache.tajo.master.session.Session; import org.apache.tajo.util.TUtil; import java.util.*; @@ -42,18 +42,18 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr private ExprAnnotator annotator; public static class PreprocessContext { - public Session session; + public QueryContext queryContext; public LogicalPlan plan; public LogicalPlan.QueryBlock currentBlock; - public PreprocessContext(Session session, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) { - this.session = session; + public PreprocessContext(QueryContext queryContext, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) { + this.queryContext = queryContext; this.plan = plan; this.currentBlock = currentBlock; } public PreprocessContext(PreprocessContext context, LogicalPlan.QueryBlock currentBlock) { - this.session = context.session; + this.queryContext = context.queryContext; this.plan = context.plan; this.currentBlock = currentBlock; } @@ -104,7 +104,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr if (CatalogUtil.isFQTableName(asteriskExpr.getQualifier())) { qualifier = asteriskExpr.getQualifier(); } else { - qualifier = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), asteriskExpr.getQualifier()); + qualifier = CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), asteriskExpr.getQualifier()); } relationOp = block.getRelation(qualifier); @@ -359,7 +359,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr if (CatalogUtil.isFQTableName(expr.getName())) { actualRelationName = relation.getName(); } else { - actualRelationName = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), relation.getName()); + actualRelationName = CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), relation.getName()); } TableDesc desc = catalog.getTableDesc(actualRelationName); @@ -388,7 +388,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr // a table subquery should be dealt as a relation. TableSubQueryNode node = ctx.plan.createNode(TableSubQueryNode.class); - node.init(CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), expr.getName()), child); + node.init(CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), expr.getName()), child); ctx.currentBlock.addRelation(node); return node; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java index bb8192f..6512ae0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java @@ -24,7 +24,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.logical.*; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.engine.query.QueryContext; import java.util.Stack; @@ -38,17 +38,17 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVeri } public static class Context { - Session session; + QueryContext queryContext; VerificationState state; - public Context(Session session, VerificationState state) { - this.session = session; + public Context(QueryContext queryContext, VerificationState state) { + this.queryContext = this.queryContext; this.state = state; } } - public VerificationState verify(Session session, VerificationState state, LogicalPlan plan) throws PlanningException { - Context context = new Context(session, state); + public VerificationState verify(QueryContext queryContext, VerificationState state, LogicalPlan plan) throws PlanningException { + Context context = new Context(queryContext, state); visit(context, plan, plan.getRootBlock()); return context.state; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java index a4820cb..35df11f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java @@ -42,17 +42,16 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode; import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.utils.SchemaUtil; -import org.apache.tajo.master.session.Session; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.util.Pair; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import java.util.*; import static org.apache.tajo.algebra.CreateTable.PartitionType; - import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult; import static org.apache.tajo.engine.planner.LogicalPlan.BlockType; import static org.apache.tajo.engine.planner.LogicalPlanPreprocessor.PreprocessContext; @@ -75,7 +74,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex } public static class PlanContext { - Session session; + QueryContext queryContext; LogicalPlan plan; // transient data for each query block @@ -83,15 +82,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex boolean debugOrUnitTests; - public PlanContext(Session session, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) { - this.session = session; + public PlanContext(QueryContext context, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) { + this.queryContext = context; this.plan = plan; this.queryBlock = block; this.debugOrUnitTests = debugOrUnitTests; } public PlanContext(PlanContext context, QueryBlock block) { - this.session = context.session; + this.queryContext = context.queryContext; this.plan = context.plan; this.queryBlock = block; this.debugOrUnitTests = context.debugOrUnitTests; @@ -109,21 +108,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex * @param expr A relational algebraic expression for a query. * @return A logical plan */ - public LogicalPlan createPlan(Session session, Expr expr) throws PlanningException { - return createPlan(session, expr, false); + public LogicalPlan createPlan(QueryContext context, Expr expr) throws PlanningException { + return createPlan(context, expr, false); } @VisibleForTesting - public LogicalPlan createPlan(Session session, Expr expr, boolean debug) throws PlanningException { + public LogicalPlan createPlan(QueryContext queryContext, Expr expr, boolean debug) throws PlanningException { - LogicalPlan plan = new LogicalPlan(session.getCurrentDatabase(), this); + LogicalPlan plan = new LogicalPlan(this); QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK); - PreprocessContext preProcessorCtx = new PreprocessContext(session, plan, rootBlock); + PreprocessContext preProcessorCtx = new PreprocessContext(queryContext, plan, rootBlock); preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr); plan.resetGeneratedId(); - PlanContext context = new PlanContext(session, plan, plan.getRootBlock(), debug); + PlanContext context = new PlanContext(queryContext, plan, plan.getRootBlock(), debug); LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr); // Add Root Node @@ -1424,7 +1423,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex databaseName = CatalogUtil.extractQualifier(expr.getTableName()); tableName = CatalogUtil.extractSimpleName(expr.getTableName()); } else { - databaseName = context.session.getCurrentDatabase(); + databaseName = context.queryContext.getCurrentDatabase(); tableName = expr.getTableName(); } TableDesc desc = catalog.getTableDesc(databaseName, tableName); @@ -1624,7 +1623,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex if (CatalogUtil.isFQTableName(parentTableName) == false) { parentTableName = - CatalogUtil.buildFQName(context.session.getCurrentDatabase(), + CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), parentTableName); } TableDesc parentTableDesc = catalog.getTableDesc(parentTableName); @@ -1657,7 +1656,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex createTableNode.setTableName(expr.getTableName()); } else { createTableNode.setTableName( - CatalogUtil.buildFQName(context.session.getCurrentDatabase(), expr.getTableName())); + CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), expr.getTableName())); } // This is CREATE TABLE <tablename> LIKE <parentTable> if(expr.getLikeParentTableName() != null) @@ -1753,7 +1752,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr; String partitionExpression = Joiner.on(',').join(partition.getColumns()); - partitionMethodDesc = new PartitionMethodDesc(context.session.getCurrentDatabase(), tableName, + partitionMethodDesc = new PartitionMethodDesc(context.queryContext.getCurrentDatabase(), tableName, CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns())); } else { throw new PlanningException(String.format("Not supported PartitonType: %s", expr.getPartitionType())); @@ -1816,7 +1815,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex if (CatalogUtil.isFQTableName(dropTable.getTableName())) { qualified = dropTable.getTableName(); } else { - qualified = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), dropTable.getTableName()); + qualified = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), dropTable.getTableName()); } dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge()); return dropTableNode; http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 6678e46..9f533e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -28,6 +28,7 @@ import com.google.common.collect.ObjectArrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -37,6 +38,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.physical.*; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.InternalException; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer; @@ -57,7 +59,6 @@ import java.util.Stack; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; -import static org.apache.tajo.conf.TajoConf.ConfVars; import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; @@ -68,7 +69,6 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce; public class PhysicalPlannerImpl implements PhysicalPlanner { private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class); private static final int UNGENERATED_PID = -1; - private final long INNER_JOIN_INMEMORY_HASH_THRESHOLD; protected final TajoConf conf; protected final AbstractStorageManager sm; @@ -76,8 +76,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) { this.conf = conf; this.sm = sm; - - this.INNER_JOIN_INMEMORY_HASH_THRESHOLD = conf.getLongVar(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD); } public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan) @@ -258,7 +256,16 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { throws IOException { String [] lineage = PlannerUtil.getRelationLineage(node); long volume = estimateSizeRecursive(context, lineage); - boolean inMemoryInnerJoinFlag = volume <= INNER_JOIN_INMEMORY_HASH_THRESHOLD; + boolean inMemoryInnerJoinFlag = false; + + QueryContext queryContext = context.getQueryContext(); + + if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) { + inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT); + } else { + inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + } + LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.", context.getTaskId().toString(), (left ? "Left" : "Right"), @@ -470,8 +477,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild()); long rightTableVolume = estimateSizeRecursive(context, rightLineage); + boolean hashJoin; - if (rightTableVolume < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) { + QueryContext queryContext = context.getQueryContext(); + + if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { + hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); + } else { + hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + } + + if (hashJoin) { // we can implement left outer join using hash join, using the right operand as the build relation LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join]."); return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); @@ -488,8 +504,18 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note: // blocking, but merge join is blocking as well) String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild()); - long outerSize = estimateSizeRecursive(context, outerLineage4); - if (outerSize < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){ + long leftTableVolume = estimateSizeRecursive(context, outerLineage4); + boolean hashJoin; + + QueryContext queryContext = context.getQueryContext(); + + if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { + hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); + } else { + hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + } + + if (hashJoin){ LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join]."); return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); } else { @@ -971,7 +997,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild()); long estimatedSize = estimateSizeRecursive(context, outerLineage); - final long threshold = conf.getLongVar(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD); + final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT); // if the relation size is less than the threshold, // the hash aggregation will be used. http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java index 2d6c095..75dcc18 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java @@ -24,7 +24,7 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.util.TUtil; import java.util.Set; @@ -38,17 +38,17 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe } public static class Context { - Session session; + QueryContext queryContext; VerificationState state; - public Context(Session session, VerificationState state) { - this.session = session; + public Context(QueryContext queryContext, VerificationState state) { + this.queryContext = queryContext; this.state = state; } } - public VerificationState verify(Session session, VerificationState state, Expr expr) throws PlanningException { - Context context = new Context(session, state); + public VerificationState verify(QueryContext queryContext, VerificationState state, Expr expr) throws PlanningException { + Context context = new Context(queryContext, state); visit(context, new Stack<Expr>(), expr); return context.state; } @@ -127,7 +127,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe if (CatalogUtil.isFQTableName(tableName)) { qualifiedName = tableName; } else { - qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName); + qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), tableName); } if (!catalog.existsTable(qualifiedName)) { @@ -143,7 +143,10 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe if (CatalogUtil.isFQTableName(tableName)) { qualifiedName = tableName; } else { - qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName); + qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), tableName); + } + if(qualifiedName == null) { + System.out.println("A"); } if (catalog.existsTable(qualifiedName)) { context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName)); @@ -246,7 +249,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe if (expr.hasTableName()) { String qualifiedName = expr.getTableName(); if (TajoConstants.EMPTY_STRING.equals(CatalogUtil.extractQualifier(expr.getTableName()))) { - qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), + qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), expr.getTableName()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 2daf799..432589b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -120,9 +121,8 @@ public class GlobalPlanner { LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan(), masterPlan.getLogicalPlan().getRootBlock().getRoot()); - boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO); - if (autoBroadcast) { - + boolean broadcastEnabled = masterPlan.getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED); + if (broadcastEnabled) { // pre-visit the master plan in order to find tables to be broadcasted // this visiting does not make any execution block and change plan. BroadcastJoinMarkCandidateVisitor markCandidateVisitor = new BroadcastJoinMarkCandidateVisitor(); @@ -268,11 +268,11 @@ public class GlobalPlanner { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; - boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO); - long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD); + boolean broadcastEnabled = context.getPlan().getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED); + long broadcastTableSizeLimit = context.getPlan().getContext().getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT); // to check when the tajo.dist-query.join.broadcast.auto property is true - if (autoBroadcast && joinNode.isCandidateBroadcast()) { + if (broadcastEnabled && joinNode.isCandidateBroadcast()) { LogicalNode leftNode = joinNode.getLeftChild(); LogicalNode rightNode = joinNode.getRightChild(); @@ -293,7 +293,7 @@ public class GlobalPlanner { // Checking Left Side of Join if (ScanNode.isScanNode(leftNode)) { ScanNode scanNode = (ScanNode)leftNode; - if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) { numLargeTables++; } else { leftBroadcast = true; @@ -306,7 +306,7 @@ public class GlobalPlanner { // Checking Right Side OF Join if (ScanNode.isScanNode(rightNode)) { ScanNode scanNode = (ScanNode)rightNode; - if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) { numLargeTables++; } else { rightBroadcast = true; @@ -331,7 +331,7 @@ public class GlobalPlanner { } JoinNode broadcastJoinNode = (JoinNode)eachNode; ScanNode scanNode = broadcastJoinNode.getRightChild(); - if (getTableVolume(scanNode) < broadcastThreshold) { + if (getTableVolume(scanNode) < broadcastTableSizeLimit) { broadcastTargetScanNodes.add(scanNode); blockJoinNode = broadcastJoinNode; LOG.info("The table " + scanNode.getCanonicalName() + " (" http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index f714758..31cb3b6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; @@ -38,6 +39,7 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -111,7 +113,7 @@ public class ExternalSortExec extends SortExec { throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2"); } // TODO - sort buffer and core num should be changed to use the allocated container resource. - this.sortBufferBytesNum = context.getConf().getLongVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576L; + this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM); this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); this.inMemoryTable = new ArrayList<Tuple>(100000); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index b1d0400..e73cc2f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -18,9 +18,9 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.PersistentStoreNode; import org.apache.tajo.storage.Appender; @@ -59,7 +59,7 @@ public class StoreTableExec extends UnaryPhysicalExec { appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, createTableNode.getTableSchema(), context.getOutputPath()); } else { - String nullChar = context.getQueryContext().get(ConfVars.CSVFILE_NULL.varname, ConfVars.CSVFILE_NULL.defaultVal); + String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR); meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, context.getOutputPath()); @@ -77,7 +77,7 @@ public class StoreTableExec extends UnaryPhysicalExec { while((tuple = child.next()) != null) { appender.addTuple(tuple); } - + return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 79d6cb3..f4160e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -19,94 +19,90 @@ package org.apache.tajo.engine.query; import org.apache.hadoop.fs.Path; -import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.ConfigKey; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.master.session.Session; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; -public class QueryContext extends KeyValueSet { - public static final String COMMAND_TYPE = "tajo.query.command"; - - public static final String STAGING_DIR = "tajo.query.staging_dir"; - - public static final String USER_NAME = "tajo.query.username"; - - public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table"; - public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path"; - public static final String OUTPUT_PARTITIONS = "tajo.query.output.partitions"; - public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite"; - public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory"; +/** + * QueryContent is a overridable config, and it provides a set of various configs for a query instance. + */ +public class QueryContext extends OverridableConf { + public static enum QueryVars implements ConfigKey { + COMMAND_TYPE, + STAGING_DIR, + OUTPUT_TABLE_NAME, + OUTPUT_TABLE_PATH, + OUTPUT_PARTITIONS, + OUTPUT_OVERWRITE, + OUTPUT_AS_DIRECTORY, + OUTPUT_PER_FILE_SIZE, + ; - public static final String TRUE_VALUE = "1"; - public static final String FALSE_VALUE = "0"; + QueryVars() { + } - public QueryContext() {} + @Override + public String keyname() { + return name().toLowerCase(); + } - public QueryContext(KeyValueSetProto proto) { - super(proto); + @Override + public ConfigType type() { + return ConfigType.QUERY; + } } - public void put(TajoConf.ConfVars key, String value) { - put(key.varname, value); + public QueryContext(TajoConf conf) { + super(conf, ConfigKey.ConfigType.QUERY); } - public String get(TajoConf.ConfVars key) { - return get(key.varname); + public QueryContext(TajoConf conf, Session session) { + super(conf); + putAll(session.getAllVariables()); } - public String get(String key) { - return super.get(key); + public QueryContext(TajoConf conf, KeyValueSetProto proto) { + super(conf, proto, ConfigKey.ConfigType.QUERY); } - public void setBool(String key, boolean val) { - put(key, val ? TRUE_VALUE : FALSE_VALUE); - } + //----------------------------------------------------------------------------------------------- + // Query Config Specified Section + //----------------------------------------------------------------------------------------------- - public boolean getBool(String key) { - String strVal = get(key); - return strVal != null ? strVal.equals(TRUE_VALUE) : false; + public String getCurrentDatabase() { + return get(SessionVars.CURRENT_DATABASE); } public void setUser(String username) { - put(USER_NAME, username); + put(SessionVars.USERNAME, username); } public String getUser() { - return get(USER_NAME); + return get(SessionVars.USERNAME); } public void setStagingDir(Path path) { - put(STAGING_DIR, path.toUri().toString()); + put(QueryVars.STAGING_DIR, path.toUri().toString()); } public Path getStagingDir() { - String strVal = get(STAGING_DIR); + String strVal = get(QueryVars.STAGING_DIR); return strVal != null ? new Path(strVal) : null; } /** - * The fact that QueryContext has an output table means this query has a target table. - * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO <table name>' statement. - * This config is not set if a query has INSERT (OVERWRITE) INTO LOCATION '/path/..'. - */ - public boolean hasOutputTable() { - return get(OUTPUT_TABLE_NAME) != null; - } - - /** * Set a target table name * * @param tableName The target table name */ public void setOutputTable(String tableName) { - put(OUTPUT_TABLE_NAME, tableName); - } - - public String getOutputTable() { - String strVal = get(OUTPUT_TABLE_NAME); - return strVal != null ? strVal : null; + put(QueryVars.OUTPUT_TABLE_NAME, tableName); } /** @@ -116,52 +112,64 @@ public class QueryContext extends KeyValueSet { * @return */ public boolean hasOutputPath() { - return get(OUTPUT_TABLE_PATH) != null; + return containsKey(QueryVars.OUTPUT_TABLE_PATH); } public void setOutputPath(Path path) { - put(OUTPUT_TABLE_PATH, path.toUri().toString()); + put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); } public Path getOutputPath() { - String strVal = get(OUTPUT_TABLE_PATH); + String strVal = get(QueryVars.OUTPUT_TABLE_PATH); return strVal != null ? new Path(strVal) : null; } public boolean hasPartition() { - return get(OUTPUT_PARTITIONS) != null; + return containsKey(QueryVars.OUTPUT_PARTITIONS); } public void setPartitionMethod(PartitionMethodDesc partitionMethodDesc) { - put(OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null); + put(QueryVars.OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null); } public PartitionMethodDesc getPartitionMethod() { - return PartitionMethodDesc.fromJson(get(OUTPUT_PARTITIONS)); + return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS)); } public void setOutputOverwrite() { - setBool(OUTPUT_OVERWRITE, true); + setBool(QueryVars.OUTPUT_OVERWRITE, true); } public boolean isOutputOverwrite() { - return getBool(OUTPUT_OVERWRITE); + return getBool(QueryVars.OUTPUT_OVERWRITE); } public void setFileOutput() { - setBool(OUTPUT_AS_DIRECTORY, true); + setBool(QueryVars.OUTPUT_AS_DIRECTORY, true); + } + + public boolean containsKey(ConfigKey key) { + return containsKey(key.keyname()); + } + + public boolean equalKey(ConfigKey key, String another) { + if (containsKey(key)) { + return get(key).equals(another); + } else { + return false; + } } - public boolean isFileOutput() { - return getBool(OUTPUT_AS_DIRECTORY); + public boolean isCommandType(NodeType commandType) { + return equalKey(QueryVars.COMMAND_TYPE, commandType.name()); } public void setCommandType(NodeType nodeType) { - put(COMMAND_TYPE, nodeType.name()); + put(QueryVars.COMMAND_TYPE, nodeType.name()); } public NodeType getCommandType() { - String strVal = get(COMMAND_TYPE); + String strVal = get(QueryVars.COMMAND_TYPE); return strVal != null ? NodeType.valueOf(strVal) : null; } @@ -170,7 +178,7 @@ public class QueryContext extends KeyValueSet { } public boolean isCreateTable() { - return getCommandType() == NodeType.CREATE_TABLE; + return isCommandType(NodeType.CREATE_TABLE); } public void setInsert() { @@ -178,14 +186,6 @@ public class QueryContext extends KeyValueSet { } public boolean isInsert() { - return getCommandType() == NodeType.INSERT; - } - - public void setHiveQueryMode() { - setBool("hive.query.mode", true); - } - - public boolean isHiveQueryMode() { - return getBool("hive.query.mode"); + return isCommandType(NodeType.INSERT); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java index f1af2ff..56df48d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.query; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -193,7 +194,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { if (!p.hasQueryContext()) { return null; } - this.queryContext = new QueryContext(p.getQueryContext()); + this.queryContext = new QueryContext(new TajoConf(), p.getQueryContext()); return this.queryContext; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 73f3cf5..37a56ba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -29,6 +29,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.AlterTablespaceSetType; import org.apache.tajo.algebra.Expr; @@ -116,40 +117,25 @@ public class GlobalEngine extends AbstractService { public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) { LOG.info("Query: " + query); - QueryContext queryContext = new QueryContext(); - queryContext.putAll(session.getAllVariables()); + QueryContext queryContext = new QueryContext(context.getConf(), session); Expr planningContext; try { if (isJson) { planningContext = buildExpressionFromJson(query); } else { - // setting environment variables - String [] cmds = query.split(" "); - if(cmds != null) { - if(cmds[0].equalsIgnoreCase("set")) { - String[] params = cmds[1].split("="); - context.getConf().set(params[0], params[1]); - SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); - responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); - return responseBuilder.build(); - } - } - planningContext = buildExpressionFromSql(queryContext, query); } String jsonExpr = planningContext.toJson(); - LogicalPlan plan = createLogicalPlan(session, planningContext); + LogicalPlan plan = createLogicalPlan(queryContext, planningContext); SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, query, jsonExpr); return response; } catch (Throwable t) { context.getSystemMetrics().counter("Query", "errorQuery").inc(); LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t)); SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); - responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); + responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME)); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setIsForwarded(true); responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); @@ -183,11 +169,11 @@ public class GlobalEngine extends AbstractService { SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); responseBuilder.setIsForwarded(false); - responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME)); + responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME)); if (PlannerUtil.checkIfDDLPlan(rootNode)) { context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); - updateQuery(session, rootNode.getChild()); + updateQuery(queryContext, rootNode.getChild()); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setResultCode(ClientProtos.ResultCode.OK); @@ -310,7 +296,7 @@ public class GlobalEngine extends AbstractService { } TaskAttemptContext taskAttemptContext = - new TaskAttemptContext(context.getConf(), queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir); + new TaskAttemptContext(queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir); taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); @@ -390,7 +376,7 @@ public class GlobalEngine extends AbstractService { } - public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException { + public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException, SQLException, PlanningException { try { LOG.info("SQL: " + sql); @@ -402,13 +388,13 @@ public class GlobalEngine extends AbstractService { expr = analyzer.parse(sql); } - LogicalPlan plan = createLogicalPlan(session, expr); + LogicalPlan plan = createLogicalPlan(queryContext, expr); LogicalRootNode rootNode = plan.getRootBlock().getRoot(); if (!PlannerUtil.checkIfDDLPlan(rootNode)) { throw new SQLException("This is not update query:\n" + sql); } else { - updateQuery(session, rootNode.getChild()); + updateQuery(queryContext, rootNode.getChild()); return QueryIdFactory.NULL_QUERY_ID; } } catch (Exception e) { @@ -417,46 +403,46 @@ public class GlobalEngine extends AbstractService { } } - private boolean updateQuery(Session session, LogicalNode root) throws IOException { + private boolean updateQuery(QueryContext queryContext, LogicalNode root) throws IOException { switch (root.getType()) { case CREATE_DATABASE: CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; - createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); + createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); return true; case DROP_DATABASE: DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; - dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); + dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); return true; case CREATE_TABLE: CreateTableNode createTable = (CreateTableNode) root; - createTable(session, createTable, createTable.isIfNotExists()); + createTable(queryContext, createTable, createTable.isIfNotExists()); return true; case DROP_TABLE: DropTableNode dropTable = (DropTableNode) root; - dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); + dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); return true; case ALTER_TABLESPACE: AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; - alterTablespace(session, alterTablespace); + alterTablespace(queryContext, alterTablespace); return true; case ALTER_TABLE: AlterTableNode alterTable = (AlterTableNode) root; - alterTable(session,alterTable); + alterTable(queryContext,alterTable); return true; case TRUNCATE_TABLE: TruncateTableNode truncateTable = (TruncateTableNode) root; - truncateTable(session, truncateTable); + truncateTable(queryContext, truncateTable); return true; default: throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson()); } } - private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException { + private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws PlanningException { VerificationState state = new VerificationState(); - preVerifier.verify(session, state, expression); + preVerifier.verify(queryContext, state, expression); if (!state.verified()) { StringBuilder sb = new StringBuilder(); for (String error : state.getErrorMessages()) { @@ -465,19 +451,19 @@ public class GlobalEngine extends AbstractService { throw new VerifyException(sb.toString()); } - LogicalPlan plan = planner.createPlan(session, expression); + LogicalPlan plan = planner.createPlan(queryContext, expression); if (LOG.isDebugEnabled()) { LOG.debug("============================================="); LOG.debug("Non Optimized Query: \n" + plan.toString()); LOG.debug("============================================="); } LOG.info("Non Optimized Query: \n" + plan.toString()); - optimizer.optimize(session, plan); + optimizer.optimize(queryContext, plan); LOG.info("============================================="); LOG.info("Optimized Query: \n" + plan.toString()); LOG.info("============================================="); - annotatedPlanVerifier.verify(session, state, plan); + annotatedPlanVerifier.verify(queryContext, state, plan); if (!state.verified()) { StringBuilder sb = new StringBuilder(); @@ -493,7 +479,7 @@ public class GlobalEngine extends AbstractService { /** * Alter a given table */ - public void alterTablespace(final Session session, final AlterTablespaceNode alterTablespace) { + public void alterTablespace(final QueryContext queryContext, final AlterTablespaceNode alterTablespace) { final CatalogService catalog = context.getCatalog(); final String spaceName = alterTablespace.getTablespaceName(); @@ -517,7 +503,7 @@ public class GlobalEngine extends AbstractService { /** * Alter a given table */ - public void alterTable(final Session session, final AlterTableNode alterTable) throws IOException { + public void alterTable(final QueryContext queryContext, final AlterTableNode alterTable) throws IOException { final CatalogService catalog = context.getCatalog(); final String tableName = alterTable.getTableName(); @@ -529,7 +515,7 @@ public class GlobalEngine extends AbstractService { databaseName = split[0]; simpleTableName = split[1]; } else { - databaseName = session.getCurrentDatabase(); + databaseName = queryContext.getCurrentDatabase(); simpleTableName = tableName; } final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); @@ -572,7 +558,8 @@ public class GlobalEngine extends AbstractService { if (existColumnName(qualifiedName, alterTable.getNewColumnName())) { throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName()); } - catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN)); + catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), + alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN)); break; case ADD_COLUMN: if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) { @@ -588,7 +575,8 @@ public class GlobalEngine extends AbstractService { /** * Truncate table a given table */ - public void truncateTable(final Session session, final TruncateTableNode truncateTableNode) throws IOException { + public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode) + throws IOException { List<String> tableNames = truncateTableNode.getTableNames(); final CatalogService catalog = context.getCatalog(); @@ -602,7 +590,7 @@ public class GlobalEngine extends AbstractService { databaseName = split[0]; simpleTableName = split[1]; } else { - databaseName = session.getCurrentDatabase(); + databaseName = queryContext.getCurrentDatabase(); simpleTableName = eachTableName; } final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); @@ -641,7 +629,7 @@ public class GlobalEngine extends AbstractService { return tableDesc.getSchema().containsByName(columnName) ? true : false; } - private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException { + private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists) throws IOException { TableMeta meta; if (createTable.hasOptions()) { @@ -659,7 +647,7 @@ public class GlobalEngine extends AbstractService { databaseName = CatalogUtil.extractQualifier(createTable.getTableName()); tableName = CatalogUtil.extractSimpleName(createTable.getTableName()); } else { - databaseName = session.getCurrentDatabase(); + databaseName = queryContext.getCurrentDatabase(); tableName = createTable.getTableName(); } @@ -668,11 +656,11 @@ public class GlobalEngine extends AbstractService { createTable.setPath(tablePath); } - return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(), + return createTableOnPath(queryContext, createTable.getTableName(), createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists); } - public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta, + public TableDesc createTableOnPath(QueryContext queryContext, String tableName, Schema schema, TableMeta meta, Path path, boolean isExternal, PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException { @@ -683,7 +671,7 @@ public class GlobalEngine extends AbstractService { databaseName = splitted[0]; simpleTableName = splitted[1]; } else { - databaseName = session.getCurrentDatabase(); + databaseName = queryContext.getCurrentDatabase(); simpleTableName = tableName; } String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); @@ -736,7 +724,7 @@ public class GlobalEngine extends AbstractService { } } - public boolean createDatabase(@Nullable Session session, String databaseName, + public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName, @Nullable String tablespace, boolean ifNotExists) throws IOException { @@ -768,7 +756,7 @@ public class GlobalEngine extends AbstractService { return true; } - public boolean dropDatabase(Session session, String databaseName, boolean ifExists) { + public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) { boolean exists = catalog.existDatabase(databaseName); if(!exists) { @@ -780,7 +768,7 @@ public class GlobalEngine extends AbstractService { } } - if (session.getCurrentDatabase().equals(databaseName)) { + if (queryContext.getCurrentDatabase().equals(databaseName)) { throw new RuntimeException("ERROR: Cannot drop the current open database"); } @@ -795,7 +783,7 @@ public class GlobalEngine extends AbstractService { * @param tableName to be dropped * @param purge Remove all data if purge is true. */ - public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) { + public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) { CatalogService catalog = context.getCatalog(); String databaseName; @@ -805,7 +793,7 @@ public class GlobalEngine extends AbstractService { databaseName = splitted[0]; simpleTableName = splitted[1]; } else { - databaseName = session.getCurrentDatabase(); + databaseName = queryContext.getCurrentDatabase(); simpleTableName = tableName; } String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 97f59ef..7d80a88 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -34,6 +34,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -193,7 +194,8 @@ public class TajoMasterClientService extends AbstractService { } @Override - public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException { + public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) + throws ServiceException { try { String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()); if (value != null) { @@ -278,9 +280,16 @@ public class TajoMasterClientService extends AbstractService { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + QueryContext queryContext = new QueryContext(conf, session); + if (queryContext.getCurrentDatabase() == null) { + for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) { + System.out.println(e.getKey() + "=" + e.getValue()); + } + } + UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder(); try { - context.getGlobalEngine().updateQuery(session, request.getQuery(), request.getIsJson()); + context.getGlobalEngine().updateQuery(queryContext, request.getQuery(), request.getIsJson()); builder.setResultCode(ResultCode.OK); return builder.build(); } catch (Exception e) { @@ -539,7 +548,9 @@ public class TajoMasterClientService extends AbstractService { public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); - if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) { + QueryContext queryContext = new QueryContext(conf, session); + + if (context.getGlobalEngine().createDatabase(queryContext, request.getValue(), null, false)) { return BOOL_TRUE; } else { return BOOL_FALSE; @@ -567,8 +578,9 @@ public class TajoMasterClientService extends AbstractService { public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + QueryContext queryContext = new QueryContext(conf, session); - if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) { + if (context.getGlobalEngine().dropDatabase(queryContext, request.getValue(), false)) { return BOOL_TRUE; } else { return BOOL_FALSE; @@ -605,6 +617,10 @@ public class TajoMasterClientService extends AbstractService { tableName = request.getValue(); } + if (databaseName == null) { + System.out.println("A"); + } + if (catalog.existsTable(databaseName, tableName)) { return BOOL_TRUE; } else { @@ -672,6 +688,7 @@ public class TajoMasterClientService extends AbstractService { throws ServiceException { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + QueryContext queryContext = new QueryContext(conf, session); Path path = new Path(request.getPath()); FileSystem fs = path.getFileSystem(conf); @@ -689,7 +706,7 @@ public class TajoMasterClientService extends AbstractService { TableDesc desc; try { - desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema, + desc = context.getGlobalEngine().createTableOnPath(queryContext, request.getName(), schema, meta, path, true, partitionDesc, false); } catch (Exception e) { return TableResponse.newBuilder() @@ -715,7 +732,9 @@ public class TajoMasterClientService extends AbstractService { public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException { try { Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId()); - context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge()); + QueryContext queryContext = new QueryContext(conf, session); + + context.getGlobalEngine().dropTable(queryContext, dropTable.getName(), false, dropTable.getPurge()); return BOOL_TRUE; } catch (Throwable t) { throw new ServiceException(t); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 8bb3dde..8111ef6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.state.*; import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.CatalogService; @@ -38,7 +39,6 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; @@ -636,7 +636,7 @@ public class Query implements EventHandler<QueryEvent> { SubQuery lastStage = query.getSubQuery(finalExecBlockId); TableMeta meta = lastStage.getTableMeta(); - String nullChar = queryContext.get(ConfVars.CSVFILE_NULL.varname, ConfVars.CSVFILE_NULL.defaultVal); + String nullChar = queryContext.get(SessionVars.NULL_CHAR); meta.putOption(StorageConstants.CSVFILE_NULL, nullChar); TableStats stats = lastStage.getResultStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 25af82f..aed69b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -29,10 +29,12 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoAsyncDispatcher; @@ -83,6 +85,8 @@ public class QueryMaster extends CompositeService implements EventHandler { private QueryMasterContext queryMasterContext; + private QueryContext queryContext; + private QueryHeartbeatThread queryHeartbeatThread; private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread; @@ -362,12 +366,9 @@ public class QueryMaster extends CompositeService implements EventHandler { try { queryMasterTask.stop(); - //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE") - // && !workerContext.isYarnContainerMode()) { - if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { cleanup(queryId); } - //} } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -408,6 +409,8 @@ public class QueryMaster extends CompositeService implements EventHandler { queryMasterTask.start(); } + queryContext = event.getQueryContext(); + synchronized(queryMasterTasks) { queryMasterTasks.put(event.getQueryId(), queryMasterTask); } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index f52d143..ec975d8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -204,7 +204,6 @@ public class QueryMasterManagerService extends CompositeService try { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId())); - // queryMaster terminated by internal error before task has not done if (queryMasterTask != null) { queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); } @@ -239,7 +238,8 @@ public class QueryMasterManagerService extends CompositeService LOG.info("Receive executeQuery request:" + queryId); queryMaster.handle(new QueryStartEvent(queryId, new Session(request.getSession()), - new QueryContext(request.getQueryContext()), request.getExprInJson().getValue(), + new QueryContext(workerContext.getQueryMaster().getContext().getConf(), + request.getQueryContext()), request.getExprInJson().getValue(), request.getLogicalPlanJson().getValue())); done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 071e5d4..5885a1d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -328,8 +328,8 @@ public class QueryMasterTask extends CompositeService { LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); - LogicalPlan plan = planner.createPlan(session, expr); - optimizer.optimize(session, plan); + LogicalPlan plan = planner.createPlan(queryContext, expr); + optimizer.optimize(queryContext, plan); GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager(); hookManager.addHook(new GlobalEngine.InsertHook()); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index fa1ed4c..940170c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -25,12 +25,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.PlanningException; import org.apache.tajo.engine.planner.RangePartitionAlgorithm; @@ -50,6 +50,7 @@ import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.Pair; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.FetchImpl; @@ -373,8 +374,7 @@ public class Repartitioner { // Getting the desire number of join tasks according to the volumn // of a larger table int largerIdx = stats[0] >= stats[1] ? 0 : 1; - int desireJoinTaskVolumn = subQuery.getContext().getConf(). - getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME); + int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); // calculate the number of tasks according to the data size int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576); @@ -858,17 +858,17 @@ public class Repartitioner { // Scattered hash shuffle hashes the key columns and groups the hash keys associated with // the same hash key. Then, if the volume of a group is larger - // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups - // according to DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB). + // than $DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups + // according to $DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB). // As a result, each group size always becomes the less than or equal - // to DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. + // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. // It is usually used for writing partitioned tables. public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, String tableName) { int i = 0; - long splitVolume = ((long) 1048576) * subQuery.getContext().getConf(). - getIntVar(ConfVars.DIST_QUERY_TABLE_PARTITION_VOLUME); // in bytes + long splitVolume = StorageUnit.MB * + subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); long sumNumBytes = 0L; Map<Integer, List<FetchImpl>> fetches = new HashMap<Integer, List<FetchImpl>>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 17efa21..b6fe9da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -32,10 +32,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import org.apache.hadoop.yarn.util.Records; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.QueryUnitId; -import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; @@ -742,7 +739,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); int taskNum = (int) Math.ceil((double) mb / - conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME)); + conf.getIntVar(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME)); int totalMem = getClusterTotalMemory(subQuery); LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB"); @@ -750,8 +747,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // determine the number of task taskNum = Math.min(taskNum, slots); - if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) { - taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM); + if (conf.getIntVar(ConfVars.$TEST_MIN_TASK_NUM) > 0) { + taskNum = conf.getIntVar(ConfVars.$TEST_MIN_TASK_NUM); LOG.warn("!!!!! TESTCASE MODE !!!!!"); } @@ -795,7 +792,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB"); // determine the number of task int taskNumBySize = (int) Math.ceil((double) mb / - conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME)); + conf.getIntVar(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME)); int totalMem = getClusterTotalMemory(subQuery); @@ -1110,7 +1107,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { stopScheduler(); releaseContainers(); - if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); for (ExecutionBlock executionBlock : childs){
