IMPALA-5602: Fix query optimization for kudu and datasource tables Fix a bug where the following queries on kudu and datasource tables were incorrectly being optimized as a 'small query' and therefore running on a single node with a single scanner thread:
(A) that have all their predicates pushed to the underlying storage layer and have a limit (B) table stats missing + Conditions in (A) Testing: Added frontend planner tests. Change-Id: I93822d67ebda41d5d0456095c429e3915a3f40c4 Reviewed-on: http://gerrit.cloudera.org:8080/7560 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6f20df81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6f20df81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6f20df81 Branch: refs/heads/master Commit: 6f20df81f727f89dfb1efef9a86f39cf5a4ef88a Parents: 81c3d88 Author: Bikramjeet Vig <[email protected]> Authored: Tue Aug 1 16:34:15 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 24 02:32:13 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/catalog/KuduTable.java | 44 ++++++++++------- .../impala/planner/DataSourceScanNode.java | 3 ++ .../apache/impala/planner/HBaseScanNode.java | 3 ++ .../org/apache/impala/planner/KuduScanNode.java | 3 ++ .../org/apache/impala/planner/ScanNode.java | 16 +++++- .../impala/util/MaxRowsProcessedVisitor.java | 5 +- .../apache/impala/common/FrontendTestBase.java | 52 +++++++++++++------- .../org/apache/impala/planner/PlannerTest.java | 3 ++ .../queries/PlannerTest/data-source-tables.test | 15 ++++++ .../queries/PlannerTest/kudu.test | 27 ++++++++++ 10 files changed, 131 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index cb94503..7e13ac5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -182,6 +182,30 @@ public class KuduTable extends Table { } /** + * Load schema and partitioning schemes directly from Kudu. + */ + public void loadSchemaFromKudu() throws ImpalaRuntimeException { + // This is set to 0 for Kudu tables. + // TODO: Change this to reflect the number of pk columns and modify all the + // places (e.g. insert stmt) that currently make use of this parameter. + numClusteringCols_ = 0; + org.apache.kudu.client.KuduTable kuduTable = null; + // Connect to Kudu to retrieve table metadata + KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { + kuduTable = kuduClient.openTable(kuduTableName_); + } catch (KuduException e) { + throw new ImpalaRuntimeException( + String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_, + e.getMessage())); + } + Preconditions.checkNotNull(kuduTable); + + loadSchema(kuduTable); + loadPartitionByParams(kuduTable); + } + + /** * Loads the metadata of a Kudu table. * * Schema and partitioning schemes are loaded directly from Kudu whereas column stats @@ -192,32 +216,14 @@ public class KuduTable extends Table { public void load(boolean dummy /* not used */, IMetaStoreClient msClient, org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { msTable_ = msTbl; - // This is set to 0 for Kudu tables. - // TODO: Change this to reflect the number of pk columns and modify all the - // places (e.g. insert stmt) that currently make use of this parameter. - numClusteringCols_ = 0; kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME); Preconditions.checkNotNull(kuduTableName_); kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS); Preconditions.checkNotNull(kuduMasters_); - org.apache.kudu.client.KuduTable kuduTable = null; setTableStats(msTable_); - - // Connect to Kudu to retrieve table metadata - KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); - try { - kuduTable = kuduClient.openTable(kuduTableName_); - } catch (KuduException e) { - throw new TableLoadingException(String.format( - "Error opening Kudu table '%s', Kudu error: %s", - kuduTableName_, e.getMessage())); - } - Preconditions.checkNotNull(kuduTable); - // Load metadata from Kudu and HMS try { - loadSchema(kuduTable); - loadPartitionByParams(kuduTable); + loadSchemaFromKudu(); loadAllColumnStats(msClient); } catch (ImpalaRuntimeException e) { throw new TableLoadingException("Error loading metadata for Kudu table " + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index cea9b53..e6679da 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -362,4 +362,7 @@ public class DataSourceScanNode extends ScanNode { } return output.toString(); } + + @Override + public boolean hasStorageLayerConjuncts() { return !acceptedConjuncts_.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index d56aa98..d2e47ad 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -508,4 +508,7 @@ public class HBaseScanNode extends ScanNode { // TODO: What's a good estimate of memory consumption? return 1024L * 1024L * 1024L; } + + @Override + public boolean hasStorageLayerConjuncts() { return !filters_.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 37a4e5c..cbc132b 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -521,4 +521,7 @@ public class KuduScanNode extends ScanNode { default: return null; } } + + @Override + public boolean hasStorageLayerConjuncts() { return !kuduConjuncts_.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/ScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index 1373e89..d6b7813 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -193,7 +193,9 @@ abstract public class ScanNode extends PlanNode { @Override public long getInputCardinality() { - if (getConjuncts().isEmpty() && hasLimit()) return getLimit(); + if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) { + return getLimit(); + } return inputCardinality_; } @@ -210,4 +212,16 @@ abstract public class ScanNode extends PlanNode { return desc_.getPath().toString(); } } + + /** + * Returns true if this node has conjuncts to be evaluated by Impala against the scan + * tuple. + */ + public boolean hasScanConjuncts() { return !getConjuncts().isEmpty(); } + + /** + * Returns true if this node has conjuncts to be evaluated by the underlying storage + * engine. + */ + public boolean hasStorageLayerConjuncts() { return false; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java index 56cf047..e338ecb 100644 --- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java @@ -52,8 +52,9 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); // In the absence of collection stats, treat scans on collections as if they // have no limit. - if (scan.isAccessingCollectionType() - || (missingStats && !(scan.hasLimit() && scan.getConjuncts().isEmpty()))) { + if (scan.isAccessingCollectionType() || + (missingStats && !(scan.hasLimit() && !scan.hasScanConjuncts() && + !scan.hasStorageLayerConjuncts()))) { valid_ = false; return; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java index 033b0e2..aa96490 100644 --- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java +++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java @@ -44,6 +44,7 @@ import org.apache.impala.catalog.Db; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.ImpaladCatalog; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.ScalarFunction; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Table; @@ -165,9 +166,9 @@ public class FrontendTestBase { } /** - * Add a new dummy table to the catalog based on the given CREATE TABLE sql. - * The dummy table only has the column definitions and the metastore table set, but no - * other metadata. + * Add a new dummy table to the catalog based on the given CREATE TABLE sql. The + * returned table only has its metadata partially set, but is capable of being planned. + * Only HDFS tables and external Kudu tables are supported. * Returns the new dummy table. * The test tables are registered in testTables_ and removed in the @After method. */ @@ -177,21 +178,36 @@ public class FrontendTestBase { Preconditions.checkNotNull(db, "Test tables must be created in an existing db."); org.apache.hadoop.hive.metastore.api.Table msTbl = CatalogOpExecutor.createMetaStoreTable(createTableStmt.toThrift()); - HdfsTable dummyTable = new HdfsTable(msTbl, db, - createTableStmt.getTbl(), createTableStmt.getOwner()); - List<ColumnDef> columnDefs = Lists.newArrayList( - createTableStmt.getPartitionColumnDefs()); - dummyTable.setNumClusteringCols(columnDefs.size()); - columnDefs.addAll(createTableStmt.getColumnDefs()); - for (int i = 0; i < columnDefs.size(); ++i) { - ColumnDef colDef = columnDefs.get(i); - dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i)); - } - try { - dummyTable.addDefaultPartition(msTbl.getSd()); - } catch (CatalogException e) { - e.printStackTrace(); - fail("Failed to add test table:\n" + createTableSql); + Table dummyTable = Table.fromMetastoreTable(db, msTbl); + if (dummyTable instanceof HdfsTable) { + List<ColumnDef> columnDefs = Lists.newArrayList( + createTableStmt.getPartitionColumnDefs()); + dummyTable.setNumClusteringCols(columnDefs.size()); + columnDefs.addAll(createTableStmt.getColumnDefs()); + for (int i = 0; i < columnDefs.size(); ++i) { + ColumnDef colDef = columnDefs.get(i); + dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i)); + } + try { + HdfsTable hdfsTable = (HdfsTable) dummyTable; + hdfsTable.addDefaultPartition(msTbl.getSd()); + } catch (CatalogException e) { + e.printStackTrace(); + fail("Failed to add test table:\n" + createTableSql); + } + } else if (dummyTable instanceof KuduTable) { + if (!Table.isExternalTable(msTbl)) { + fail("Failed to add table, external kudu table expected:\n" + createTableSql); + } + try { + KuduTable kuduTable = (KuduTable) dummyTable; + kuduTable.loadSchemaFromKudu(); + } catch (ImpalaRuntimeException e) { + e.printStackTrace(); + fail("Failed to add test table:\n" + createTableSql); + } + } else { + fail("Test table type not supported:\n" + createTableSql); } db.addTable(dummyTable); testTables_.add(dummyTable); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 3bb8083..fc8ceab 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -314,6 +314,9 @@ public class PlannerTest extends PlannerTestBase { @Test public void testKudu() { Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported()); + addTestDb("kudu_planner_test", "Test DB for Kudu Planner."); + addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " + + "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');"); runPlannerTestFile("kudu"); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test index 7cc5c04..ce4dbd7 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test @@ -96,3 +96,18 @@ PLAN-ROOT SINK | 00:EMPTYSET ==== +---- QUERY +# IMPALA-5602: If a query contains predicates that are all pushed to the datasource and +# there is a limit, then the query should not incorrectly run with 'small query' +# optimization. +select * from functional.alltypes_datasource where id = 1 limit 15 +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 15 +| +00:SCAN DATA SOURCE [functional.alltypes_datasource] +data source predicates: id = 1 + limit: 15 +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 079d291..e620ad6 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -424,3 +424,30 @@ INSERT INTO KUDU [functional_kudu.alltypes] 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ==== +# IMPALA-5602: If a query contains predicates that are all pushed to kudu and there is a +# limit, then the query should not incorrectly run with 'small query' optimization. +select * from functional_kudu.alltypesagg where tinyint_col = 9 limit 10; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 10 +| +00:SCAN KUDU [functional_kudu.alltypesagg_idx] + kudu predicates: functional_kudu.alltypesagg_idx.tinyint_col = 9 + limit: 10 +==== +# IMPALA-5602: If a query contains predicates that are all pushed to kudu, there is a +# limit, and no table stats, then the query should not incorrectly run with 'small query' +# optimization. +select * from kudu_planner_test.no_stats where tinyint_col = 9 limit 10; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 10 +| +00:SCAN KUDU [kudu_planner_test.no_stats] + kudu predicates: tinyint_col = 9 + limit: 10 +====
