IMPALA-4572: Run COMPUTE STATS on Parquet tables with MT_DOP=4. COMPUTE STATS on Parquet tables is run with MT_DOP=4 by default. COMPUTE STATS on non-Parquet tables will run without MT_DOP.
Users can always override the behavior by setting MT_DOP manually. Setting MT_DOP to 0 means a statement will be run in the conventional execution mode (without intra-node paralellism based on multiple fragment instances). Users can set a higher MT_DOP even for Parquet tables. Testing: Added a new test that checks the effective MT_DOP. Locally ran test_mt_dop.py, test_scanners.py, test_nested_types.py, test_compute_stats.py, and test_cancellation.py. Change-Id: I2be3c7c9f3004e9a759224a2e5756eb6e4efa359 Reviewed-on: http://gerrit.cloudera.org:8080/5315 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7efa0831 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7efa0831 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7efa0831 Branch: refs/heads/master Commit: 7efa08316ecb8f73d1c968ed602d11d40c714a1f Parents: 6662c55 Author: Alex Behm <[email protected]> Authored: Thu Dec 1 13:58:19 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Sat Dec 3 22:28:53 2016 +0000 ---------------------------------------------------------------------- common/thrift/ImpalaInternalService.thrift | 3 +- .../impala/analysis/ComputeStatsStmt.java | 20 ++++++++ .../org/apache/impala/planner/HdfsScanNode.java | 3 +- .../impala/planner/SingleNodePlanner.java | 10 ++-- .../org/apache/impala/service/Frontend.java | 18 ++++++- .../org/apache/impala/planner/PlannerTest.java | 52 ++++++++++++++++++++ .../apache/impala/planner/PlannerTestBase.java | 4 +- .../org/apache/impala/service/FrontendTest.java | 4 +- tests/query_test/test_mt_dop.py | 4 +- 9 files changed, 107 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 4f81e27..f18947a 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -191,7 +191,8 @@ struct TQueryOptions { // query per backend. // > 0: multi-threaded execution mode, with given dop // 0: single-threaded execution mode - 44: optional i32 mt_dop = 0 + // unset: may be set automatically to > 0 in createExecRequest(), otherwise same as 0 + 44: optional i32 mt_dop // If true, INSERT writes to S3 go directly to their final location rather than being // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java index 84b866b..90c46a8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java @@ -17,6 +17,7 @@ package org.apache.impala.analysis; +import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -24,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HBaseTable; +import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Table; @@ -528,6 +530,24 @@ public class ComputeStatsStmt extends StatementBase { public String getTblStatsQuery() { return tableStatsQueryStr_; } public String getColStatsQuery() { return columnStatsQueryStr_; } + /** + * Returns true if this statement computes stats on Parquet partitions only, + * false otherwise. + */ + public boolean isParquetOnly() { + if (!(table_ instanceof HdfsTable)) return false; + Collection<HdfsPartition> affectedPartitions = null; + if (partitionSet_ != null) { + affectedPartitions = partitionSet_.getPartitions(); + } else { + affectedPartitions = ((HdfsTable) table_).getPartitions(); + } + for (HdfsPartition partition: affectedPartitions) { + if (partition.getFileFormat() != HdfsFileFormat.PARQUET) return false; + } + return true; + } + @Override public String toSql() { if (!isIncremental_) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 0aee399..9642b97 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -173,7 +173,8 @@ public class HdfsScanNode extends ScanNode { // Determine backend scan node implementation to use. The optimized MT implementation // is currently only supported for Parquet. - if (analyzer.getQueryOptions().mt_dop > 0 && + if (analyzer.getQueryOptions().isSetMt_dop() && + analyzer.getQueryOptions().mt_dop > 0 && fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) { useMtScanNode_ = true; } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 1634dd2..4bc8a88 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -158,14 +158,18 @@ public class SingleNodePlanner { * Throws a NotImplementedException if plan validation fails. */ public void validatePlan(PlanNode planNode) throws NotImplementedException { - if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv() + if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop > 0 + && !RuntimeEnv.INSTANCE.isTestEnv() && (planNode instanceof JoinNode || ctx_.hasTableSink())) { throw new NotImplementedException( "MT_DOP not supported for plans with base table joins or table sinks."); } - // As long as MT_DOP == 0 any join can run in a single-node plan. - if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return; + // As long as MT_DOP is unset or 0 any join can run in a single-node plan. + if (ctx_.isSingleNodeExec() && + (!ctx_.getQueryOptions().isSetMt_dop() || ctx_.getQueryOptions().mt_dop == 0)) { + return; + } if (planNode instanceof NestedLoopJoinNode) { JoinNode joinNode = (JoinNode) planNode; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index e7eabb1..c98ba49 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -114,6 +114,7 @@ import org.apache.impala.thrift.TPlanExecInfo; import org.apache.impala.thrift.TPlanFragment; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TResetMetadataRequest; import org.apache.impala.thrift.TResultRow; import org.apache.impala.thrift.TResultSet; @@ -978,8 +979,9 @@ public class Frontend { Planner planner, StringBuilder explainString) throws ImpalaException { TQueryCtx queryCtx = planner.getQueryCtx(); AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult(); - boolean isMtExec = - analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0; + boolean isMtExec = analysisResult.isQueryStmt() && + queryCtx.request.query_options.isSetMt_dop() && + queryCtx.request.query_options.mt_dop > 0; List<PlanFragment> planRoots = Lists.newArrayList(); TQueryExecRequest result = new TQueryExecRequest(); @@ -1038,6 +1040,7 @@ public class Frontend { result.setAccess_events(analysisResult.getAccessEvents()); result.analysis_warnings = analysisResult.getAnalyzer().getWarnings(); + TQueryOptions queryOptions = queryCtx.request.query_options; if (analysisResult.isCatalogOp()) { result.stmt_type = TStmtType.DDL; createCatalogOpRequest(analysisResult, result); @@ -1045,6 +1048,15 @@ public class Frontend { if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { result.catalog_op_request.setLineage_graph(thriftLineageGraph); } + // Set MT_DOP=4 for COMPUTE STATS on Parquet tables, unless the user has already + // provided another value for MT_DOP. + if (!queryOptions.isSetMt_dop() && + analysisResult.isComputeStatsStmt() && + analysisResult.getComputeStatsStmt().isParquetOnly()) { + queryOptions.setMt_dop(4); + } + // If unset, set MT_DOP to 0 to simplify the rest of the code. + if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0); // All DDL operations except for CTAS are done with analysis at this point. if (!analysisResult.isCreateTableAsSelectStmt()) return result; } else if (analysisResult.isLoadDataStmt()) { @@ -1061,6 +1073,8 @@ public class Frontend { result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); return result; } + // If unset, set MT_DOP to 0 to simplify the rest of the code. + if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0); // create TQueryExecRequest Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index dce32a6..8c48ee4 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -17,14 +17,22 @@ package org.apache.impala.planner; +import org.apache.impala.catalog.Catalog; import org.apache.impala.catalog.Db; +import org.apache.impala.common.ImpalaException; import org.apache.impala.common.RuntimeEnv; +import org.apache.impala.testutil.TestUtils; +import org.apache.impala.thrift.TExecRequest; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; +import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import jline.internal.Preconditions; + // All planner tests, except for S3 specific tests should go here. public class PlannerTest extends PlannerTestBase { @@ -308,4 +316,48 @@ public class PlannerTest extends PlannerTestBase { RuntimeEnv.INSTANCE.setTestEnv(true); } } + + @Test + public void testComputeStatsMtDop() { + for (int mtDop: new int[] {-1, 0, 1, 16}) { + int effectiveMtDop = (mtDop != -1) ? mtDop : 0; + // MT_DOP is not set automatically for stmt other than COMPUTE STATS. + testEffectiveMtDop( + "select * from functional_parquet.alltypes", mtDop, effectiveMtDop); + // MT_DOP is not set automatically for COMPUTE STATS on non-Parquet tables. + testEffectiveMtDop( + "compute stats functional.alltypes", mtDop, effectiveMtDop); + } + // MT_DOP is set automatically for COMPUTE STATS on Parquet tables, + // but can be overridden by a user-provided MT_DOP. + testEffectiveMtDop("compute stats functional_parquet.alltypes", -1, 4); + testEffectiveMtDop("compute stats functional_parquet.alltypes", 0, 0); + testEffectiveMtDop("compute stats functional_parquet.alltypes", 1, 1); + testEffectiveMtDop("compute stats functional_parquet.alltypes", 16, 16); + } + + /** + * Creates an exec request for 'stmt' setting the MT_DOP query option to 'userMtDop', + * or leaving it unset if 'userMtDop' is -1. Asserts that the MT_DOP of the generated + * exec request is equal to 'expectedMtDop'. + */ + private void testEffectiveMtDop(String stmt, int userMtDop, int expectedMtDop) { + TQueryCtx queryCtx = TestUtils.createQueryContext( + Catalog.DEFAULT_DB, System.getProperty("user.name")); + queryCtx.request.setStmt(stmt); + queryCtx.request.query_options = defaultQueryOptions(); + if (userMtDop != -1) queryCtx.request.query_options.setMt_dop(userMtDop); + StringBuilder explainBuilder = new StringBuilder(); + TExecRequest request = null; + try { + request = frontend_.createExecRequest(queryCtx, explainBuilder); + } catch (ImpalaException e) { + Assert.fail("Failed to create exec request for '" + stmt + "': " + e.getMessage()); + } + Preconditions.checkNotNull(request); + int actualMtDop = -1; + if (request.query_options.isSetMt_dop()) actualMtDop = request.query_options.mt_dop; + // Check that the effective MT_DOP is as expected. + Assert.assertEquals(actualMtDop, expectedMtDop); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 4fff233..5e6dbc7 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -364,7 +364,7 @@ public class PlannerTestBase extends FrontendTestBase { /** * Merge the options of b into a and return a */ - private TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) { + protected TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) { for(TQueryOptions._Fields f : TQueryOptions._Fields.values()) { if (b.isSet(f)) { a.setFieldValue(f, b.getFieldValue(f)); @@ -484,7 +484,7 @@ public class PlannerTestBase extends FrontendTestBase { ImpalaInternalServiceConstants.NUM_NODES_ALL); } if (section == Section.PARALLELPLANS) { - queryCtx.request.query_options.mt_dop = 2; + queryCtx.request.query_options.setMt_dop(2); } ArrayList<String> expectedPlan = testCase.getSectionContents(section); boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/service/FrontendTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java index dfbdb12..dd6a6c8 100644 --- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java +++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java @@ -32,7 +32,6 @@ import org.apache.hive.service.cli.thrift.TGetInfoReq; import org.apache.hive.service.cli.thrift.TGetSchemasReq; import org.apache.hive.service.cli.thrift.TGetTablesReq; import org.junit.Test; - import org.apache.impala.analysis.AuthorizationTest; import org.apache.impala.authorization.AuthorizationConfig; import org.apache.impala.catalog.Catalog; @@ -41,11 +40,14 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.testutil.ImpaladTestCatalog; import org.apache.impala.testutil.TestUtils; +import org.apache.impala.thrift.TExecRequest; import org.apache.impala.thrift.TMetadataOpRequest; import org.apache.impala.thrift.TMetadataOpcode; import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TResultRow; import org.apache.impala.thrift.TResultSet; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/tests/query_test/test_mt_dop.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py index ff60b60..1d522fd 100644 --- a/tests/query_test/test_mt_dop.py +++ b/tests/query_test/test_mt_dop.py @@ -25,7 +25,9 @@ from tests.common.skip import SkipIfOldAggsJoins from tests.common.test_vector import TestDimension from tests.common.test_vector import TestVector -MT_DOP_VALUES = [1, 2, 8] +# COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include +# the value 0 to cover the non-MT path as well. +MT_DOP_VALUES = [0, 1, 2, 8] class TestMtDop(ImpalaTestSuite): @classmethod
