Repository: drill Updated Branches: refs/heads/master c6dbe6a2f -> 03e8f9f3e
DRILL-4826: Query against INFORMATION_SCHEMA.TABLES degrades as the number of views increases This closes #592 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/03e8f9f3 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/03e8f9f3 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/03e8f9f3 Branch: refs/heads/master Commit: 03e8f9f3e01c56a9411bb4333e4851c92db6e410 Parents: c6dbe6a Author: Parth Chandra <[email protected]> Authored: Wed Aug 3 23:02:01 2016 -0700 Committer: Parth Chandra <[email protected]> Committed: Thu Oct 27 17:36:00 2016 -0700 ---------------------------------------------------------------------- .../store/hive/DrillHiveMetaStoreClient.java | 26 +++++++++++ .../store/hive/schema/HiveDatabaseSchema.java | 32 +++++++------ .../exec/hive/TestInfoSchemaOnHiveStorage.java | 34 ++++++++++++++ .../org/apache/drill/exec/ExecConstants.java | 8 ++++ .../server/options/SystemOptionManager.java | 1 + .../apache/drill/exec/store/AbstractSchema.java | 21 ++++++++- .../exec/store/dfs/WorkspaceSchemaFactory.java | 47 ++++++++++++++++++++ .../ischema/InfoSchemaRecordGenerator.java | 31 +++++++------ .../apache/drill/jdbc/test/TestJdbcQuery.java | 2 +- 9 files changed, 168 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java index bbc1c70..2fe291b 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/DrillHiveMetaStoreClient.java @@ -21,7 +21,10 @@ import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.calcite.schema.Schema; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.util.ImpersonationUtil; @@ -233,6 +236,29 @@ public abstract class DrillHiveMetaStoreClient extends HiveMetaStoreClient { } } + public static List<Table> getTablesByNamesByBulkLoadHelper( + final HiveMetaStoreClient mClient, final List<String> tableNames, final String schemaName, + final int bulkSize) { + final int totalTables = tableNames.size(); + final List<org.apache.hadoop.hive.metastore.api.Table> tables = Lists.newArrayList(); + + // In each round, Drill asks for a sub-list of all the requested tables + for (int fromIndex = 0; fromIndex < totalTables; fromIndex += bulkSize) { + final int toIndex = Math.min(fromIndex + bulkSize, totalTables); + final List<String> eachBulkofTableNames = tableNames.subList(fromIndex, toIndex); + List<org.apache.hadoop.hive.metastore.api.Table> eachBulkofTables; + // Retries once if the first call to fetch the metadata fails + try { + eachBulkofTables = DrillHiveMetaStoreClient.getTableObjectsByNameHelper(mClient, schemaName, eachBulkofTableNames); + } catch (Exception e) { + logger.warn("Exception occurred while trying to read tables from {}: {}", schemaName, e.getCause()); + return ImmutableList.of(); + } + tables.addAll(eachBulkofTables); + } + return tables; + } + /** Helper method which gets table metadata. Retries once if the first call to fetch the metadata fails */ protected static HiveReadEntry getHiveReadEntryHelper(final IMetaStoreClient mClient, final String dbName, final String tableName) throws TException { http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java index d07a69d..90f30d8 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hive.schema; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.calcite.rel.type.RelDataType; @@ -78,32 +79,26 @@ public class HiveDatabaseSchema extends AbstractSchema{ } @Override - public List<Pair<String, ? extends Table>> getTablesByNamesByBulkLoad(final List<String> tableNames) { + public List<Pair<String, ? extends Table>> getTablesByNamesByBulkLoad(final List<String> tableNames, + final int bulkSize) { final String schemaName = getName(); - final List<Pair<String, ? extends Table>> tableNameToTable = Lists.newArrayList(); - List<org.apache.hadoop.hive.metastore.api.Table> tables; - try { - tables = DrillHiveMetaStoreClient.getTableObjectsByNameHelper(mClient, schemaName, tableNames); - } catch (TException e) { - logger.warn("Exception occurred while trying to list tables by names from {}: {}", schemaName, e.getCause()); - return tableNameToTable; - } + final List<org.apache.hadoop.hive.metastore.api.Table> tables = DrillHiveMetaStoreClient + .getTablesByNamesByBulkLoadHelper(mClient, tableNames, schemaName, bulkSize); - for(final org.apache.hadoop.hive.metastore.api.Table table : tables) { - if(table == null) { + final List<Pair<String, ? extends Table>> tableNameToTable = Lists.newArrayList(); + for (final org.apache.hadoop.hive.metastore.api.Table table : tables) { + if (table == null) { continue; } final String tableName = table.getTableName(); final TableType tableType; - if(table.getTableType().equals(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.toString())) { + if (table.getTableType().equals(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.toString())) { tableType = TableType.VIEW; } else { tableType = TableType.TABLE; } - tableNameToTable.add(Pair.of( - tableName, - new HiveTableWithoutStatisticAndRowType(tableType))); + tableNameToTable.add(Pair.of(tableName, new HiveTableWithoutStatisticAndRowType(tableType))); } return tableNameToTable; } @@ -117,12 +112,14 @@ public class HiveDatabaseSchema extends AbstractSchema{ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - throw new UnsupportedOperationException("RowType was not retrieved when this table had been being requested"); + throw new UnsupportedOperationException( + "RowType was not retrieved when this table had been being requested"); } @Override public Statistic getStatistic() { - throw new UnsupportedOperationException("Statistic was not retrieved when this table had been being requested"); + throw new UnsupportedOperationException( + "Statistic was not retrieved when this table had been being requested"); } @Override @@ -130,4 +127,5 @@ public class HiveDatabaseSchema extends AbstractSchema{ return tableType; } } + } http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java index a8c6e68..0a94867 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java @@ -223,4 +223,38 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase { .baselineValues("2", " key_2") .go(); } + + @Test // DRILL-4577 + public void showInfoSchema() throws Exception { + final String query = "select * \n" + + "from INFORMATION_SCHEMA.`TABLES` \n" + + "where TABLE_SCHEMA like 'hive%'"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE") + .baselineValues("DRILL", "hive.db1", "kv_db1", "TABLE") + .baselineValues("DRILL", "hive.db1", "avro", "TABLE") + .baselineValues("DRILL", "hive.default", "kv", "TABLE") + .baselineValues("DRILL", "hive.default", "empty_table", "TABLE") + .baselineValues("DRILL", "hive.default", "readtest", "TABLE") + .baselineValues("DRILL", "hive.default", "infoschematest", "TABLE") + .baselineValues("DRILL", "hive.default", "readtest_parquet", "TABLE") + .baselineValues("DRILL", "hive.default", "hiveview", "VIEW") + .baselineValues("DRILL", "hive.default", "partition_pruning_test", "TABLE") + .baselineValues("DRILL", "hive.default", "kv_parquet", "TABLE") + .baselineValues("DRILL", "hive.default", "countstar_parquet", "TABLE") + .baselineValues("DRILL", "hive.default", "kv_sh", "TABLE") + .baselineValues("DRILL", "hive.default", "simple_json", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_footer", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_rcfile_large", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_parquet_large", "TABLE") + .baselineValues("DRILL", "hive.skipper", "kv_sequencefile_large", "TABLE") + .go(); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 027c942..053311f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -307,6 +307,14 @@ public interface ExecConstants { BooleanValidator ENABLE_BULK_LOAD_TABLE_LIST = new BooleanValidator(ENABLE_BULK_LOAD_TABLE_LIST_KEY, false); /** + * When getting Hive Table information with exec.enable_bulk_load_table_list set to true, + * use the exec.bulk_load_table_list.bulk_size to determine how many tables to fetch from HiveMetaStore + * at a time. (The number of tables can get to be quite large.) + */ + String BULK_LOAD_TABLE_LIST_BULK_SIZE_KEY = "exec.bulk_load_table_list.bulk_size"; + PositiveLongValidator BULK_LOAD_TABLE_LIST_BULK_SIZE = new PositiveLongValidator(BULK_LOAD_TABLE_LIST_BULK_SIZE_KEY, Integer.MAX_VALUE, 1000); + + /** * Option whose value is a comma separated list of admin usernames. Admin users are users who have special privileges * such as changing system options. */ http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index d43c868..71ebd7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -145,6 +145,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR, ExecConstants.ENABLE_NEW_TEXT_READER, ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST, + ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE, ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR, ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL_VALIDATOR, ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR, http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java index f7ec3fe..7a16d0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java @@ -208,7 +208,7 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, * @param tableNames the requested tables, specified by the table names * @return the collection of requested tables */ - public List<Pair<String, ? extends Table>> getTablesByNamesByBulkLoad(final List<String> tableNames) { + public List<Pair<String, ? extends Table>> getTablesByNamesByBulkLoad(final List<String> tableNames, int bulkSize) { return getTablesByNames(tableNames); } @@ -231,4 +231,21 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, } return tables; } -} \ No newline at end of file + + public List<Pair<String, Schema.TableType>> getTableNamesAndTypes(boolean bulkLoad, int bulkSize) { + final List<String> tableNames = Lists.newArrayList(getTableNames()); + final List<Pair<String, Schema.TableType>> tableNamesAndTypes = Lists.newArrayList(); + final List<Pair<String, ? extends Table>> tables; + if (bulkLoad) { + tables = getTablesByNamesByBulkLoad(tableNames, bulkSize); + } else { + tables = getTablesByNames(tableNames); + } + for (Pair<String, ? extends Table> table : tables) { + tableNamesAndTypes.add(Pair.of(table.getKey(), table.getValue().getJdbcTableType())); + } + + return tableNamesAndTypes; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index 1623463..dac313b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -41,6 +42,7 @@ import org.apache.calcite.schema.FunctionParameter; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TableMacro; import org.apache.calcite.schema.TranslatableTable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; @@ -738,5 +740,50 @@ public class WorkspaceSchemaFactory { .build(logger); } } + + @Override + public List<Pair<String, TableType>> getTableNamesAndTypes(boolean bulkLoad, int bulkSize) { + final List<Pair<String, TableType>> tableNamesAndTypes = Lists.newArrayList(); + + // Look for raw tables first + if (!tables.isEmpty()) { + for (Map.Entry<TableInstance, DrillTable> tableEntry : tables.entrySet()) { + tableNamesAndTypes + .add(Pair.of(tableEntry.getKey().sig.name, tableEntry.getValue().getJdbcTableType())); + } + } + // Then look for files that start with this name and end in .drill. + List<DotDrillFile> files = Collections.emptyList(); + try { + files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), DotDrillType.VIEW); + } catch (AccessControlException e) { + if (!schemaConfig.getIgnoreAuthErrors()) { + logger.debug(e.getMessage()); + throw UserException.permissionError(e) + .message("Not authorized to list or query tables in schema [%s]", getFullSchemaName()) + .build(logger); + } + } catch (IOException e) { + logger.warn("Failure while trying to list view tables in workspace [{}]", getFullSchemaName(), e); + } catch (UnsupportedOperationException e) { + // the file system (e.g. the classpath filesystem) may not support listing + // of files. But see getViews(), it ignores the exception and continues + logger.debug("Failure while trying to list view tables in workspace [{}]", getFullSchemaName(), e); + } + + try { + for (DotDrillFile f : files) { + if (f.getType() == DotDrillType.VIEW) { + tableNamesAndTypes.add(Pair.of(f.getBaseName(), TableType.VIEW)); + } + } + } catch (UnsupportedOperationException e) { + logger.debug("The filesystem for this workspace does not support this operation.", e); + } + + return tableNamesAndTypes; + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java index 0d31c3c..aee3dc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java @@ -55,6 +55,7 @@ import com.google.common.collect.Lists; * schema, table or field. */ public abstract class InfoSchemaRecordGenerator<S> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaRecordGenerator.class); protected InfoSchemaFilter filter; protected OptionManager optionManager; @@ -295,26 +296,28 @@ public abstract class InfoSchemaRecordGenerator<S> { @Override public void visitTables(String schemaPath, SchemaPlus schema) { final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class); + final List<Pair<String, TableType>> tableNamesAndTypes = drillSchema + .getTableNamesAndTypes(optionManager.getOption(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST), + (int)optionManager.getOption(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE)); - final List<String> tableNames = Lists.newArrayList(schema.getTableNames()); - final List<Pair<String, ? extends Table>> tableNameToTables; - if(optionManager.getOption(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST)) { - tableNameToTables = drillSchema.getTablesByNamesByBulkLoad(tableNames); - } else { - tableNameToTables = drillSchema.getTablesByNames(tableNames); - } - - for(Pair<String, ? extends Table> tableNameToTable : tableNameToTables) { - final String tableName = tableNameToTable.getKey(); - final Table table = tableNameToTable.getValue(); - final TableType tableType = tableNameToTable.getValue().getJdbcTableType(); + for (Pair<String, TableType> tableNameAndType : tableNamesAndTypes) { + final String tableName = tableNameAndType.getKey(); + final TableType tableType = tableNameAndType.getValue(); // Visit the table, and if requested ... - if(shouldVisitTable(schemaPath, tableName, tableType)) { - visitTable(schemaPath, tableName, table); + if (shouldVisitTable(schemaPath, tableName, tableType)) { + visitTableWithType(schemaPath, tableName, tableType); } } } + private void visitTableWithType(String schemaName, String tableName, TableType type) { + Preconditions + .checkNotNull(type, "Error. Type information for table %s.%s provided is null.", schemaName, + tableName); + records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, type.toString())); + return; + } + @Override public boolean visitTable(String schemaName, String tableName, Table table) { Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName); http://git-wip-us.apache.org/repos/asf/drill/blob/03e8f9f3/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java index ee56487..bff620e 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java @@ -126,7 +126,7 @@ public class TestJdbcQuery extends JdbcTestQueryBase { public void testSimilarNotSimilar() throws Exception{ JdbcAssert.withNoDefaultSchema() .sql("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.`TABLES` "+ - "WHERE TABLE_NAME SIMILAR TO '%(H|I)E%' AND TABLE_NAME NOT SIMILAR TO 'C%'") + "WHERE TABLE_NAME SIMILAR TO '%(H|I)E%' AND TABLE_NAME NOT SIMILAR TO 'C%' ORDER BY TABLE_NAME") .returns( "TABLE_NAME=SCHEMATA\n" + "TABLE_NAME=VIEWS\n"
