Repository: tajo Updated Branches: refs/heads/branch-0.11.1 62fc9f8e7 -> 0c9b6a6e9
http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 7f18fd4..c088a8b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -94,7 +94,7 @@ public class GlobalEngine extends AbstractService { preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance()); // Access path rewriter is enabled only in QueryMasterTask - optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog()); + optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog(), TablespaceManager.getInstance()); annotatedPlanVerifier = new LogicalPlanVerifier(); postLogicalPlanVerifier = new PostLogicalPlanVerifier(); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java index b8f2caf..f28da58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java @@ -112,7 +112,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { } else { value = null; } - return new SetSession(ctx.name.getText(), value); + // Keep upper case letters (workaround temporarily) + return new SetSession(ctx.name.getText().toUpperCase(), value); } else if (checkIfExist(ctx.TIME()) && checkIfExist(ctx.ZONE())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index b010f1c..741b7ca 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -319,7 +319,7 @@ public class QueryMasterTask extends CompositeService { LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED)); CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog, TablespaceManager.getInstance()); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java index 96617d1..bb8494c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java @@ -42,6 +42,7 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleProvider; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.storage.StorageService; import org.apache.tajo.util.ReflectionUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.graph.DirectedGraphCursor; @@ -58,14 +59,17 @@ import static org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm.g public class LogicalOptimizer { private static final Log LOG = LogFactory.getLog(LogicalOptimizer.class.getName()); - private CatalogService catalog; + private final CatalogService catalog; + private final StorageService storage; private BaseLogicalPlanRewriteEngine rulesBeforeJoinOpt; private BaseLogicalPlanRewriteEngine rulesAfterToJoinOpt; private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm(); - public LogicalOptimizer(TajoConf conf, CatalogService catalog) { + public LogicalOptimizer(TajoConf conf, CatalogService catalog, StorageService storage) { this.catalog = catalog; + this.storage = storage; + // TODO: set the catalog instance to FilterPushdownRule Class clazz = conf.getClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS); LogicalPlanRewriteRuleProvider provider = (LogicalPlanRewriteRuleProvider) ReflectionUtil.newInstance(clazz, conf); @@ -84,7 +88,7 @@ public class LogicalOptimizer { } public LogicalNode optimize(OverridableConf context, LogicalPlan plan) throws TajoException { - rulesBeforeJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, plan, catalog)); + rulesBeforeJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, plan, catalog, storage)); DirectedGraphCursor<String, BlockEdge> blockCursor = new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName()); @@ -97,7 +101,7 @@ public class LogicalOptimizer { } else { LOG.info("Skip join order optimization"); } - rulesAfterToJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, plan, catalog)); + rulesAfterToJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, plan, catalog, storage)); return plan.getRootBlock().getRoot(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index d4f7813..2bdec36 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -1336,7 +1336,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex QueryBlock block = context.queryBlock; ScanNode scanNode = block.getNodeFromExpr(expr); - updatePhysicalInfo(scanNode.getTableDesc()); // Find expression which can be evaluated at this relation node. // Except for column references, additional expressions used in select list, where clause, order-by clauses @@ -1395,22 +1394,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex return targets; } - private void updatePhysicalInfo(TableDesc desc) { - - // FAKEFILE is used for test - if (!desc.getMeta().getDataFormat().equals("SYSTEM") && !desc.getMeta().getDataFormat().equals("FAKEFILE")) { - try { - if (desc.getStats() != null) { - desc.getStats().setNumBytes(storage.getTableVolumn(desc.getUri())); - } - } catch (UnsupportedException t) { - LOG.warn(desc.getName() + " does not support Tablespace::getTableVolume()"); - // -1 means unknown volume size. - desc.getStats().setNumBytes(-1); - } - } - } - @Override public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<Expr> stack, TablePrimarySubQuery expr) throws TajoException { http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java index 3e30e3f..2022c2b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java @@ -20,7 +20,6 @@ package org.apache.tajo.plan.rewrite; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.OverridableConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java index 120529c..3776f62 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java @@ -47,6 +47,9 @@ public class BaseLogicalPlanRewriteRuleProvider extends LogicalPlanRewriteRulePr rules.add(FilterPushDownRule.class); } + // for updating table stats + rules.add(TableStatUpdateRewriter.class); + return rules; } http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleContext.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleContext.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleContext.java index 6c43112..565b14b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleContext.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleContext.java @@ -21,22 +21,24 @@ package org.apache.tajo.plan.rewrite; import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.storage.StorageService; public class LogicalPlanRewriteRuleContext { - private OverridableConf queryContext; - private LogicalPlan plan; + private final OverridableConf queryContext; + private final LogicalPlan plan; private CatalogService catalog; + private StorageService storage; - public LogicalPlanRewriteRuleContext(OverridableConf queryContext, LogicalPlan plan) { - setQueryContext(queryContext); - setPlan(plan); + public LogicalPlanRewriteRuleContext(OverridableConf context, LogicalPlan plan) { + this.queryContext = context; + this.plan = plan; } - public LogicalPlanRewriteRuleContext(OverridableConf queryContext, LogicalPlan plan, CatalogService catalog) { - setQueryContext(queryContext); - setPlan(plan); - setCatalog(catalog); + public LogicalPlanRewriteRuleContext(OverridableConf context, LogicalPlan plan, CatalogService catalog, StorageService storage) { + this(context, plan); + this.catalog = catalog; + this.storage = storage; } public void setCatalog(CatalogService catalog) { @@ -51,15 +53,11 @@ public class LogicalPlanRewriteRuleContext { return queryContext; } - public void setQueryContext(OverridableConf queryContext) { - this.queryContext = queryContext; - } - public LogicalPlan getPlan() { return plan; } - public void setPlan(LogicalPlan plan) { - this.plan = plan; + public StorageService getStorage() { + return storage; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java new file mode 100644 index 0000000..9c6f285 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.rewrite; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.storage.StorageService; + +import java.util.Stack; + +public class TableStatUpdateRewriter implements LogicalPlanRewriteRule { + private static final Log LOG = LogFactory.getLog(TableStatUpdateRewriter.class); + + private static final String NAME = "Table Stat Updater"; + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + return true; + } + + @Override + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { + LogicalPlan plan = context.getPlan(); + LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); + + Rewriter r = new Rewriter(context.getQueryContext(), context.getStorage()); + r.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); + return plan; + } + + private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> { + private final OverridableConf conf; + private final StorageService storage; + + + private Rewriter(OverridableConf conf, StorageService storage) { + this.conf = conf; + this.storage = storage; + } + + @Override + public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode, + Stack<LogicalNode> stack) throws TajoException { + final TableDesc table = scanNode.getTableDesc(); + + if (!isVirtual(table)) { + + final TableStats stats = getTableStat(table); + final long tableSize = stats.getNumBytes(); + + // If USE_TABLE_VOLUME is set, we will update the table volume through a storage handler. + // In addition, if the table size is zero, we will update too. + // It is a good workaround to avoid suboptimal join orders without cheap cost. + if (conf.getBool(SessionVars.USE_TABLE_VOLUME) || tableSize == 0) { + table.getStats().setNumBytes(getTableVolume(table)); + } + } + + return scanNode; + } + + private TableStats getTableStat(TableDesc table) { + TableStats stats; + if (table.getStats() == null) { + stats = new TableStats(); + table.setStats(stats); + } else { + stats = table.getStats(); + } + return stats; + } + + private boolean isVirtual(TableDesc table) { + return table.getMeta().getDataFormat().equals("SYSTEM"); + } + + private long getTableVolume(TableDesc table) { + try { + if (table.getStats() != null) { + return storage.getTableVolumn(table.getUri()); + } + } catch (UnsupportedException t) { + LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()"); + } + + // By default, return -1; + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0c9b6a6e/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLQueryTests.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLQueryTests.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLQueryTests.java index e82a0df..52b7531 100644 --- a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLQueryTests.java +++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLQueryTests.java @@ -86,12 +86,13 @@ public class TestPgSQLQueryTests extends QueryTestCaseBase { @Option(sort = true) public void testTPCH_Q2_Part_MixedStorage() throws Exception { // Manually enable broadcast feature + try { testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true"); testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname, "" + (5 * 1024)); testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname, - "" + (2 * 1024)); + 1024 * 1024 + ""); runSimpleTests(); @@ -101,7 +102,7 @@ public class TestPgSQLQueryTests extends QueryTestCaseBase { testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname, TajoConf.ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.defaultVal); testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname, - TajoConf.ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.defaultVal); + 1024 * 1024 + ""); } }
