http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java new file mode 100644 index 0000000..36fb50d --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -0,0 +1,2223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import static org.apache.commons.lang.StringUtils.join; +import static org.apache.commons.lang.StringUtils.repeat; + +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; +import javax.jdo.Transaction; +import javax.jdo.datastore.JDOConnection; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.model.MConstraint; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MNotificationLog; +import org.apache.hadoop.hive.metastore.model.MNotificationNextId; +import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; +import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; +import org.apache.hadoop.hive.metastore.model.MWMResourcePlan; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hive.common.util.BloomFilter; +import org.datanucleus.store.rdbms.query.ForwardQueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * This class contains the optimizations for MetaStore that rely on direct SQL access to + * the underlying database. It should use ANSI SQL and be compatible with common databases + * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc. + * + * As of now, only the partition retrieval is done this way to improve job startup time; + * JDOQL partition retrieval is still present so as not to limit the ORM solution we have + * to SQL stores only. There's always a way to do without direct SQL. + */ +class MetaStoreDirectSql { + private static final int NO_BATCHING = -1, DETECT_BATCHING = 0; + + private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class); + private final PersistenceManager pm; + private final String schema; + + /** + * We want to avoid db-specific code in this class and stick with ANSI SQL. However: + * 1) mysql and postgres are differently ansi-incompatible (mysql by default doesn't support + * quoted identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case). + * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will + * use that. MySQL detection is done by actually issuing the set-ansi-quotes command; + * + * Use sparingly, we don't want to devolve into another DataNucleus... + */ + private final DatabaseProduct dbType; + private final int batchSize; + private final boolean convertMapNullsToEmptyStrings; + private final String defaultPartName; + + /** + * Whether direct SQL can be used with the current datastore backing {@link #pm}. + */ + private final boolean isCompatibleDatastore; + private final boolean isAggregateStatsCacheEnabled; + private AggregateStatsCache aggrStatsCache; + + @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD) + @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME) + private @interface TableName {} + + // Table names with schema name, if necessary + @TableName + private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS, + SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES, + SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS, + TAB_COL_STATS, PARTITION_KEY_VALS; + + public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) { + this.pm = pm; + this.schema = schema; + DatabaseProduct dbType = null; + try { + dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm)); + } catch (SQLException e) { + LOG.warn("Cannot determine database product; assuming OTHER", e); + dbType = DatabaseProduct.OTHER; + } + this.dbType = dbType; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE); + if (batchSize == DETECT_BATCHING) { + batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING; + } + this.batchSize = batchSize; + + for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) { + if (f.getAnnotation(TableName.class) == null) continue; + try { + f.set(this, getFullyQualifiedName(schema, f.getName())); + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new RuntimeException("Internal error, cannot set " + f.getName()); + } + } + + convertMapNullsToEmptyStrings = + MetastoreConf.getBoolVar(conf, ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS); + defaultPartName = MetastoreConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); + + String jdoIdFactory = MetastoreConf.getVar(conf, ConfVars.IDENTIFIER_FACTORY); + if (! ("datanucleus1".equalsIgnoreCase(jdoIdFactory))){ + LOG.warn("Underlying metastore does not use 'datanucleus1' for its ORM naming scheme." + + " Disabling directSQL as it uses hand-hardcoded SQL with that assumption."); + isCompatibleDatastore = false; + } else { + isCompatibleDatastore = ensureDbInit() && runTestQuery(); + if (isCompatibleDatastore) { + LOG.info("Using direct SQL, underlying DB is " + dbType); + } + } + + isAggregateStatsCacheEnabled = MetastoreConf.getBoolVar( + conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED); + if (isAggregateStatsCacheEnabled) { + aggrStatsCache = AggregateStatsCache.getInstance(conf); + } + } + + private static String getFullyQualifiedName(String schema, String tblName) { + return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + "\".\"") + + "\"" + tblName + "\""; + } + + + public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) { + this(pm, conf, ""); + } + + static String getProductName(PersistenceManager pm) { + JDOConnection jdoConn = pm.getDataStoreConnection(); + try { + return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName(); + } catch (Throwable t) { + LOG.warn("Error retrieving product name", t); + return null; + } finally { + jdoConn.close(); // We must release the connection before we call other pm methods. + } + } + + private boolean ensureDbInit() { + Transaction tx = pm.currentTransaction(); + boolean doCommit = false; + if (!tx.isActive()) { + tx.begin(); + doCommit = true; + } + LinkedList<Query> initQueries = new LinkedList<>(); + + try { + // Force the underlying db to initialize. + initQueries.add(pm.newQuery(MDatabase.class, "name == ''")); + initQueries.add(pm.newQuery(MTableColumnStatistics.class, "dbName == ''")); + initQueries.add(pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''")); + initQueries.add(pm.newQuery(MConstraint.class, "childIntegerIndex < 0")); + initQueries.add(pm.newQuery(MNotificationLog.class, "dbName == ''")); + initQueries.add(pm.newQuery(MNotificationNextId.class, "nextEventId < -1")); + initQueries.add(pm.newQuery(MWMResourcePlan.class, "name == ''")); + Query q; + while ((q = initQueries.peekFirst()) != null) { + q.execute(); + initQueries.pollFirst(); + } + + return true; + } catch (Exception ex) { + doCommit = false; + LOG.warn("Database initialization failed; direct SQL is disabled", ex); + tx.rollback(); + return false; + } finally { + if (doCommit) { + tx.commit(); + } + for (Query q : initQueries) { + try { + q.closeAll(); + } catch (Throwable t) { + } + } + } + } + + private boolean runTestQuery() { + Transaction tx = pm.currentTransaction(); + boolean doCommit = false; + if (!tx.isActive()) { + tx.begin(); + doCommit = true; + } + Query query = null; + // Run a self-test query. If it doesn't work, we will self-disable. What a PITA... + String selfTestQuery = "select \"DB_ID\" from " + DBS + ""; + try { + prepareTxn(); + query = pm.newQuery("javax.jdo.query.SQL", selfTestQuery); + query.execute(); + return true; + } catch (Throwable t) { + doCommit = false; + LOG.warn("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", t); + tx.rollback(); + return false; + } finally { + if (doCommit) { + tx.commit(); + } + if (query != null) { + query.closeAll(); + } + } + } + + public String getSchema() { + return schema; + } + + public boolean isCompatibleDatastore() { + return isCompatibleDatastore; + } + + private void executeNoResult(final String queryText) throws SQLException { + JDOConnection jdoConn = pm.getDataStoreConnection(); + Statement statement = null; + boolean doTrace = LOG.isDebugEnabled(); + try { + long start = doTrace ? System.nanoTime() : 0; + statement = ((Connection)jdoConn.getNativeConnection()).createStatement(); + statement.execute(queryText); + timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0); + } finally { + if(statement != null){ + statement.close(); + } + jdoConn.close(); // We must release the connection before we call other pm methods. + } + } + + public Database getDatabase(String dbName) throws MetaException{ + Query queryDbSelector = null; + Query queryDbParams = null; + try { + dbName = dbName.toLowerCase(); + + String queryTextDbSelector= "select " + + "\"DB_ID\", \"NAME\", \"DB_LOCATION_URI\", \"DESC\", " + + "\"OWNER_NAME\", \"OWNER_TYPE\" " + + "FROM "+ DBS +" where \"NAME\" = ? "; + Object[] params = new Object[] { dbName }; + queryDbSelector = pm.newQuery("javax.jdo.query.SQL", queryTextDbSelector); + + if (LOG.isTraceEnabled()) { + LOG.trace("getDatabase:query instantiated : " + queryTextDbSelector + + " with param [" + params[0] + "]"); + } + + List<Object[]> sqlResult = executeWithArray( + queryDbSelector, params, queryTextDbSelector); + if ((sqlResult == null) || sqlResult.isEmpty()) { + return null; + } + + assert(sqlResult.size() == 1); + if (sqlResult.get(0) == null) { + return null; + } + + Object[] dbline = sqlResult.get(0); + Long dbid = extractSqlLong(dbline[0]); + + String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" " + + " from " + DATABASE_PARAMS + " " + + " WHERE \"DB_ID\" = ? " + + " AND \"PARAM_KEY\" IS NOT NULL"; + params[0] = dbid; + queryDbParams = pm.newQuery("javax.jdo.query.SQL", queryTextDbParams); + if (LOG.isTraceEnabled()) { + LOG.trace("getDatabase:query2 instantiated : " + queryTextDbParams + + " with param [" + params[0] + "]"); + } + + Map<String,String> dbParams = new HashMap<String,String>(); + List<Object[]> sqlResult2 = ensureList(executeWithArray( + queryDbParams, params, queryTextDbParams)); + if (!sqlResult2.isEmpty()) { + for (Object[] line : sqlResult2) { + dbParams.put(extractSqlString(line[0]), extractSqlString(line[1])); + } + } + Database db = new Database(); + db.setName(extractSqlString(dbline[1])); + db.setLocationUri(extractSqlString(dbline[2])); + db.setDescription(extractSqlString(dbline[3])); + db.setOwnerName(extractSqlString(dbline[4])); + String type = extractSqlString(dbline[5]); + db.setOwnerType( + (null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type)); + db.setParameters(MetaStoreUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings)); + if (LOG.isDebugEnabled()){ + LOG.debug("getDatabase: directsql returning db " + db.getName() + + " locn["+db.getLocationUri() +"] desc [" +db.getDescription() + + "] owner [" + db.getOwnerName() + "] ownertype ["+ db.getOwnerType() +"]"); + } + return db; + } finally { + if (queryDbSelector != null){ + queryDbSelector.closeAll(); + } + if (queryDbParams != null){ + queryDbParams.closeAll(); + } + } + } + + /** + * Gets partitions by using direct SQL queries. + * Note that batching is not needed for this method - list of names implies the batch size; + * @param dbName Metastore db name. + * @param tblName Metastore table name. + * @param partNames Partition names to get. + * @return List of partitions. + */ + public List<Partition> getPartitionsViaSqlFilter(final String dbName, final String tblName, + List<String> partNames) throws MetaException { + if (partNames.isEmpty()) { + return Collections.emptyList(); + } + return runBatched(partNames, new Batchable<String, Partition>() { + @Override + public List<Partition> run(List<String> input) throws MetaException { + String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; + return getPartitionsViaSqlFilterInternal(dbName, tblName, null, filter, input, + Collections.<String>emptyList(), null); + } + }); + } + + /** + * Gets partitions by using direct SQL queries. + * @param filter The filter. + * @param max The maximum number of partitions to return. + * @return List of partitions. + */ + public List<Partition> getPartitionsViaSqlFilter( + SqlFilterForPushdown filter, Integer max) throws MetaException { + Boolean isViewTable = isViewTable(filter.table); + return getPartitionsViaSqlFilterInternal(filter.table.getDbName(), filter.table.getTableName(), + isViewTable, filter.filter, filter.params, filter.joins, max); + } + + public static class SqlFilterForPushdown { + private final List<Object> params = new ArrayList<Object>(); + private final List<String> joins = new ArrayList<String>(); + private String filter; + private Table table; + } + + public boolean generateSqlFilterForPushdown( + Table table, ExpressionTree tree, SqlFilterForPushdown result) throws MetaException { + // Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround. + boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType); + result.table = table; + result.filter = PartitionFilterGenerator.generateSqlFilter(table, tree, result.params, + result.joins, dbHasJoinCastBug, defaultPartName, dbType, schema); + return result.filter != null; + } + + /** + * Gets all partitions of a table by using direct SQL queries. + * @param dbName Metastore db name. + * @param tblName Metastore table name. + * @param max The maximum number of partitions to return. + * @return List of partitions. + */ + public List<Partition> getPartitions( + String dbName, String tblName, Integer max) throws MetaException { + return getPartitionsViaSqlFilterInternal(dbName, tblName, null, + null, Collections.<String>emptyList(), Collections.<String>emptyList(), max); + } + + private static Boolean isViewTable(Table t) { + return t.isSetTableType() ? + t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null; + } + + private boolean isViewTable(String dbName, String tblName) throws MetaException { + Query query = null; + try { + String queryText = "select \"TBL_TYPE\" from " + TBLS + "" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " + + " where " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ?"; + Object[] params = new Object[] { tblName, dbName }; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + query.setUnique(true); + Object result = executeWithArray(query, params, queryText); + return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString()); + } finally { + if (query != null) { + query.closeAll(); + } + } + } + + /** + * Get partition objects for the query using direct SQL queries, to avoid bazillion + * queries created by DN retrieving stuff for each object individually. + * @param dbName Metastore db name. + * @param tblName Metastore table name. + * @param isView Whether table is a view. Can be passed as null if not immediately + * known, then this method will get it only if necessary. + * @param sqlFilter SQL filter to use. Better be SQL92-compliant. + * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order. + * @param joinsForFilter if the filter needs additional join statement, they must be in + * this list. Better be SQL92-compliant. + * @param max The maximum number of partitions to return. + * @return List of partition objects. + */ + private List<Partition> getPartitionsViaSqlFilterInternal(String dbName, String tblName, + final Boolean isView, String sqlFilter, List<? extends Object> paramsForFilter, + List<String> joinsForFilter, Integer max) throws MetaException { + boolean doTrace = LOG.isDebugEnabled(); + final String dbNameLcase = dbName.toLowerCase(), tblNameLcase = tblName.toLowerCase(); + // We have to be mindful of order during filtering if we are not returning all partitions. + String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : ""; + + // Get all simple fields for partitions and related objects, which we can map one-on-one. + // We will do this in 2 queries to use different existing indices for each one. + // We do not get table and DB name, assuming they are the same as we are using to filter. + // TODO: We might want to tune the indexes instead. With current ones MySQL performs + // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual + // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by + // just adding a \"PART_ID\" IN (...) filter that doesn't alter the results to it, probably + // causing it to not sort the entire table due to not knowing how selective the filter is. + String queryText = + "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + "" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " + + " and " + TBLS + ".\"TBL_NAME\" = ? " + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " + + " and " + DBS + ".\"NAME\" = ? " + + join(joinsForFilter, ' ') + + (StringUtils.isBlank(sqlFilter) ? "" : (" where " + sqlFilter)) + orderForFilter; + Object[] params = new Object[paramsForFilter.size() + 2]; + params[0] = tblNameLcase; + params[1] = dbNameLcase; + for (int i = 0; i < paramsForFilter.size(); ++i) { + params[i + 2] = paramsForFilter.get(i); + } + + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + if (max != null) { + query.setRange(0, max.shortValue()); + } + List<Object> sqlResult = executeWithArray(query, params, queryText); + long queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + if (sqlResult.isEmpty()) { + return Collections.emptyList(); // no partitions, bail early. + } + + // Get full objects. For Oracle/etc. do it in batches. + List<Partition> result = runBatched(sqlResult, new Batchable<Object, Partition>() { + @Override + public List<Partition> run(List<Object> input) throws MetaException { + return getPartitionsFromPartitionIds(dbNameLcase, tblNameLcase, isView, input); + } + }); + + query.closeAll(); + return result; + } + + /** Should be called with the list short enough to not trip up Oracle/etc. */ + private List<Partition> getPartitionsFromPartitionIds(String dbName, String tblName, + Boolean isView, List<Object> partIdList) throws MetaException { + boolean doTrace = LOG.isDebugEnabled(); + int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma + int sbCapacity = partIdList.size() * idStringWidth; + // Prepare StringBuilder for "PART_ID in (...)" to use in future queries. + StringBuilder partSb = new StringBuilder(sbCapacity); + for (Object partitionId : partIdList) { + partSb.append(extractSqlLong(partitionId)).append(","); + } + String partIds = trimCommaList(partSb); + + // Get most of the fields for the IDs provided. + // Assume db and table names are the same for all partition, as provided in arguments. + String queryText = + "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\"," + + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," + + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\"," + + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\"," + + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\" " + + "from " + PARTITIONS + "" + + " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" " + + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" " + + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc"; + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + List<Object[]> sqlResult = executeWithArray(query, null, queryText); + long queryTime = doTrace ? System.nanoTime() : 0; + Deadline.checkTimeout(); + + // Read all the fields and create partitions, SDs and serdes. + TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>(); + TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>(); + TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>(); + TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>(); + // Keep order by name, consistent with JDO. + ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size()); + + // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries. + StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity); + StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema. + tblName = tblName.toLowerCase(); + dbName = dbName.toLowerCase(); + for (Object[] fields : sqlResult) { + // Here comes the ugly part... + long partitionId = extractSqlLong(fields[0]); + Long sdId = extractSqlLong(fields[1]); + Long colId = extractSqlLong(fields[2]); + Long serdeId = extractSqlLong(fields[3]); + // A partition must have at least sdId and serdeId set, or nothing set if it's a view. + if (sdId == null || serdeId == null) { + if (isView == null) { + isView = isViewTable(dbName, tblName); + } + if ((sdId != null || colId != null || serdeId != null) || !isView) { + throw new MetaException("Unexpected null for one of the IDs, SD " + sdId + + ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view"); + } + } + + Partition part = new Partition(); + orderedResult.add(part); + // Set the collection fields; some code might not check presence before accessing them. + part.setParameters(new HashMap<String, String>()); + part.setValues(new ArrayList<String>()); + part.setDbName(dbName); + part.setTableName(tblName); + if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4])); + if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5])); + partitions.put(partitionId, part); + + if (sdId == null) continue; // Probably a view. + assert serdeId != null; + + // We assume each partition has an unique SD. + StorageDescriptor sd = new StorageDescriptor(); + StorageDescriptor oldSd = sds.put(sdId, sd); + if (oldSd != null) { + throw new MetaException("Partitions reuse SDs; we don't expect that"); + } + // Set the collection fields; some code might not check presence before accessing them. + sd.setSortCols(new ArrayList<Order>()); + sd.setBucketCols(new ArrayList<String>()); + sd.setParameters(new HashMap<String, String>()); + sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(), + new ArrayList<List<String>>(), new HashMap<List<String>, String>())); + sd.setInputFormat((String)fields[6]); + Boolean tmpBoolean = extractSqlBoolean(fields[7]); + if (tmpBoolean != null) sd.setCompressed(tmpBoolean); + tmpBoolean = extractSqlBoolean(fields[8]); + if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean); + sd.setLocation((String)fields[9]); + if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10])); + sd.setOutputFormat((String)fields[11]); + sdSb.append(sdId).append(","); + part.setSd(sd); + + if (colId != null) { + List<FieldSchema> cols = colss.get(colId); + // We expect that colId will be the same for all (or many) SDs. + if (cols == null) { + cols = new ArrayList<FieldSchema>(); + colss.put(colId, cols); + colsSb.append(colId).append(","); + } + sd.setCols(cols); + } + + // We assume each SD has an unique serde. + SerDeInfo serde = new SerDeInfo(); + SerDeInfo oldSerde = serdes.put(serdeId, serde); + if (oldSerde != null) { + throw new MetaException("SDs reuse serdes; we don't expect that"); + } + serde.setParameters(new HashMap<String, String>()); + serde.setName((String)fields[12]); + serde.setSerializationLib((String)fields[13]); + serdeSb.append(serdeId).append(","); + sd.setSerdeInfo(serde); + Deadline.checkTimeout(); + } + query.closeAll(); + timingTrace(doTrace, queryText, start, queryTime); + + // Now get all the one-to-many things. Start with partitions. + queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + "" + + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null" + + " order by \"PART_ID\" asc"; + loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() { + @Override + public void apply(Partition t, Object[] fields) { + t.putToParameters((String)fields[1], (String)fields[2]); + }}); + // Perform conversion of null map values + for (Partition t : partitions.values()) { + t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); + } + + queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + "" + + " where \"PART_ID\" in (" + partIds + ")" + + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc"; + loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() { + @Override + public void apply(Partition t, Object[] fields) { + t.addToValues((String)fields[1]); + }}); + + // Prepare IN (blah) lists for the following queries. Cut off the final ','s. + if (sdSb.length() == 0) { + assert serdeSb.length() == 0 && colsSb.length() == 0; + return orderedResult; // No SDs, probably a view. + } + + String sdIds = trimCommaList(sdSb); + String serdeIds = trimCommaList(serdeSb); + String colIds = trimCommaList(colsSb); + + // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs. + queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + "" + + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null" + + " order by \"SD_ID\" asc"; + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + @Override + public void apply(StorageDescriptor t, Object[] fields) { + t.putToParameters((String)fields[1], extractSqlClob(fields[2])); + }}); + // Perform conversion of null map values + for (StorageDescriptor t : sds.values()) { + t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); + } + + queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\"" + + " from " + SORT_COLS + "" + + " where \"SD_ID\" in (" + sdIds + ")" + + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + @Override + public void apply(StorageDescriptor t, Object[] fields) { + if (fields[2] == null) return; + t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2]))); + }}); + + queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + "" + + " where \"SD_ID\" in (" + sdIds + ")" + + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + @Override + public void apply(StorageDescriptor t, Object[] fields) { + t.addToBucketCols((String)fields[1]); + }}); + + // Skewed columns stuff. + queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + "" + + " where \"SD_ID\" in (" + sdIds + ")" + + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc"; + boolean hasSkewedColumns = + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + @Override + public void apply(StorageDescriptor t, Object[] fields) { + if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo()); + t.getSkewedInfo().addToSkewedColNames((String)fields[1]); + }}) > 0; + + // Assume we don't need to fetch the rest of the skewed column data if we have no columns. + if (hasSkewedColumns) { + // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless. + queryText = + "select " + SKEWED_VALUES + ".\"SD_ID_OID\"," + + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\"," + + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" " + + "from " + SKEWED_VALUES + " " + + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "." + + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" " + + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") " + + " and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null " + + " and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 " + + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc," + + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc"; + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + private Long currentListId; + private List<String> currentList; + @Override + public void apply(StorageDescriptor t, Object[] fields) throws MetaException { + if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo()); + // Note that this is not a typical list accumulator - there's no call to finalize + // the last list. Instead we add list to SD first, as well as locally to add elements. + if (fields[1] == null) { + currentList = null; // left outer join produced a list with no values + currentListId = null; + t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList()); + } else { + long fieldsListId = extractSqlLong(fields[1]); + if (currentListId == null || fieldsListId != currentListId) { + currentList = new ArrayList<String>(); + currentListId = fieldsListId; + t.getSkewedInfo().addToSkewedColValues(currentList); + } + currentList.add((String)fields[2]); + } + }}); + + // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless. + queryText = + "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\"," + + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID," + + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\"," + + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" " + + "from " + SKEWED_COL_VALUE_LOC_MAP + "" + + " left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "." + + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" " + + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")" + + " and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null " + + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc," + + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc," + + " " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc"; + + loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { + private Long currentListId; + private List<String> currentList; + @Override + public void apply(StorageDescriptor t, Object[] fields) throws MetaException { + if (!t.isSetSkewedInfo()) { + SkewedInfo skewedInfo = new SkewedInfo(); + skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>()); + t.setSkewedInfo(skewedInfo); + } + Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps(); + // Note that this is not a typical list accumulator - there's no call to finalize + // the last list. Instead we add list to SD first, as well as locally to add elements. + if (fields[1] == null) { + currentList = new ArrayList<String>(); // left outer join produced a list with no values + currentListId = null; + } else { + long fieldsListId = extractSqlLong(fields[1]); + if (currentListId == null || fieldsListId != currentListId) { + currentList = new ArrayList<String>(); + currentListId = fieldsListId; + } else { + skewMap.remove(currentList); // value based compare.. remove first + } + currentList.add((String)fields[3]); + } + skewMap.put(currentList, (String)fields[2]); + }}); + } // if (hasSkewedColumns) + + // Get FieldSchema stuff if any. + if (!colss.isEmpty()) { + // We are skipping the CDS table here, as it seems to be totally useless. + queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\"" + + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")" + + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc"; + loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() { + @Override + public void apply(List<FieldSchema> t, Object[] fields) { + t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1])); + }}); + } + + // Finally, get all the stuff for serdes - just the params. + queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + "" + + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null" + + " order by \"SERDE_ID\" asc"; + loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() { + @Override + public void apply(SerDeInfo t, Object[] fields) { + t.putToParameters((String)fields[1], extractSqlClob(fields[2])); + }}); + // Perform conversion of null map values + for (SerDeInfo t : serdes.values()) { + t.setParameters(MetaStoreUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings)); + } + + return orderedResult; + } + + public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws MetaException { + boolean doTrace = LOG.isDebugEnabled(); + String dbName = filter.table.getDbName().toLowerCase(); + String tblName = filter.table.getTableName().toLowerCase(); + + // Get number of partitions by doing count on PART_ID. + String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " + PARTITIONS + "" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" " + + " and " + TBLS + ".\"TBL_NAME\" = ? " + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " + + " and " + DBS + ".\"NAME\" = ? " + + join(filter.joins, ' ') + + (filter.filter == null || filter.filter.trim().isEmpty() ? "" : (" where " + filter.filter)); + + Object[] params = new Object[filter.params.size() + 2]; + params[0] = tblName; + params[1] = dbName; + for (int i = 0; i < filter.params.size(); ++i) { + params[i + 2] = filter.params.get(i); + } + + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + query.setUnique(true); + int sqlResult = extractSqlInt(query.executeWithArray(params)); + long queryTime = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, queryTime); + return sqlResult; + } + + + private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) { + if (!doTrace) return; + LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " + + (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]"); + } + + static Long extractSqlLong(Object obj) throws MetaException { + if (obj == null) return null; + if (!(obj instanceof Number)) { + throw new MetaException("Expected numeric type but got " + obj.getClass().getName()); + } + return ((Number)obj).longValue(); + } + + private static Boolean extractSqlBoolean(Object value) throws MetaException { + // MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping. People using derby probably + // don't care about performance anyway, but let's cover the common case. + if (value == null) return null; + if (value instanceof Boolean) return (Boolean)value; + Character c = null; + if (value instanceof String && ((String)value).length() == 1) { + c = ((String)value).charAt(0); + } + if (c == null) return null; + if (c == 'Y') return true; + if (c == 'N') return false; + throw new MetaException("Cannot extract boolean from column value " + value); + } + + private int extractSqlInt(Object field) { + return ((Number)field).intValue(); + } + + private String extractSqlString(Object value) { + if (value == null) return null; + return value.toString(); + } + + static Double extractSqlDouble(Object obj) throws MetaException { + if (obj == null) + return null; + if (!(obj instanceof Number)) { + throw new MetaException("Expected numeric type but got " + obj.getClass().getName()); + } + return ((Number) obj).doubleValue(); + } + + private String extractSqlClob(Object value) { + if (value == null) return null; + try { + if (value instanceof Clob) { + // we trim the Clob value to a max length an int can hold + int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2; + return ((Clob)value).getSubString(1L, maxLength); + } else { + return value.toString(); + } + } catch (SQLException sqle) { + return null; + } + } + + static byte[] extractSqlBlob(Object value) throws MetaException { + if (value == null) + return null; + if (value instanceof Blob) { + //derby, oracle + try { + // getBytes function says: pos the ordinal position of the first byte in + // the BLOB value to be extracted; the first byte is at position 1 + return ((Blob) value).getBytes(1, (int) ((Blob) value).length()); + } catch (SQLException e) { + throw new MetaException("Encounter error while processing blob."); + } + } + else if (value instanceof byte[]) { + // mysql, postgres, sql server + return (byte[]) value; + } + else { + // this may happen when enablebitvector is false + LOG.debug("Expected blob type but got " + value.getClass().getName()); + return null; + } + } + + private static String trimCommaList(StringBuilder sb) { + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + return sb.toString(); + } + + private abstract class ApplyFunc<Target> { + public abstract void apply(Target t, Object[] fields) throws MetaException; + } + + /** + * Merges applies the result of a PM SQL query into a tree of object. + * Essentially it's an object join. DN could do this for us, but it issues queries + * separately for every object, which is suboptimal. + * @param tree The object tree, by ID. + * @param queryText The query text. + * @param keyIndex Index of the Long column corresponding to the map ID in query result rows. + * @param func The function that is called on each (object,row) pair with the same id. + * @return the count of results returned from the query. + */ + private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree, + String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException { + boolean doTrace = LOG.isDebugEnabled(); + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + Object result = query.execute(); + long queryTime = doTrace ? System.nanoTime() : 0; + if (result == null) { + query.closeAll(); + return 0; + } + List<Object[]> list = ensureList(result); + Iterator<Object[]> iter = list.iterator(); + Object[] fields = null; + for (Map.Entry<Long, T> entry : tree.entrySet()) { + if (fields == null && !iter.hasNext()) break; + long id = entry.getKey(); + while (fields != null || iter.hasNext()) { + if (fields == null) { + fields = iter.next(); + } + long nestedId = extractSqlLong(fields[keyIndex]); + if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId); + if (nestedId > id) break; // fields belong to one of the next entries + func.apply(entry.getValue(), fields); + fields = null; + } + Deadline.checkTimeout(); + } + int rv = list.size(); + query.closeAll(); + timingTrace(doTrace, queryText, start, queryTime); + return rv; + } + + private static class PartitionFilterGenerator extends TreeVisitor { + private final Table table; + private final FilterBuilder filterBuffer; + private final List<Object> params; + private final List<String> joins; + private final boolean dbHasJoinCastBug; + private final String defaultPartName; + private final DatabaseProduct dbType; + private final String PARTITION_KEY_VALS, PARTITIONS, DBS, TBLS; + + private PartitionFilterGenerator(Table table, List<Object> params, List<String> joins, + boolean dbHasJoinCastBug, String defaultPartName, DatabaseProduct dbType, String schema) { + this.table = table; + this.params = params; + this.joins = joins; + this.dbHasJoinCastBug = dbHasJoinCastBug; + this.filterBuffer = new FilterBuilder(false); + this.defaultPartName = defaultPartName; + this.dbType = dbType; + this.PARTITION_KEY_VALS = getFullyQualifiedName(schema, "PARTITION_KEY_VALS"); + this.PARTITIONS = getFullyQualifiedName(schema, "PARTITIONS"); + this.DBS = getFullyQualifiedName(schema, "DBS"); + this.TBLS = getFullyQualifiedName(schema, "TBLS"); + } + + /** + * Generate the ANSI SQL92 filter for the given expression tree + * @param table the table being queried + * @param params the ordered parameters for the resulting expression + * @param joins the joins necessary for the resulting expression + * @return the string representation of the expression tree + */ + private static String generateSqlFilter(Table table, ExpressionTree tree, List<Object> params, + List<String> joins, boolean dbHasJoinCastBug, String defaultPartName, + DatabaseProduct dbType, String schema) throws MetaException { + assert table != null; + if (tree == null) { + // consistent with other APIs like makeExpressionTree, null is returned to indicate that + // the filter could not pushed down due to parsing issue etc + return null; + } + if (tree.getRoot() == null) { + return ""; + } + PartitionFilterGenerator visitor = new PartitionFilterGenerator( + table, params, joins, dbHasJoinCastBug, defaultPartName, dbType, schema); + tree.accept(visitor); + if (visitor.filterBuffer.hasError()) { + LOG.info("Unable to push down SQL filter: " + visitor.filterBuffer.getErrorMessage()); + return null; + } + + // Some joins might be null (see processNode for LeafNode), clean them up. + for (int i = 0; i < joins.size(); ++i) { + if (joins.get(i) != null) continue; + joins.remove(i--); + } + return "(" + visitor.filterBuffer.getFilter() + ")"; + } + + @Override + protected void beginTreeNode(TreeNode node) throws MetaException { + filterBuffer.append(" ("); + } + + @Override + protected void midTreeNode(TreeNode node) throws MetaException { + filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or "); + } + + @Override + protected void endTreeNode(TreeNode node) throws MetaException { + filterBuffer.append(") "); + } + + @Override + protected boolean shouldStop() { + return filterBuffer.hasError(); + } + + private static enum FilterType { + Integral, + String, + Date, + + Invalid; + + static FilterType fromType(String colTypeStr) { + if (colTypeStr.equals(ColumnType.STRING_TYPE_NAME)) { + return FilterType.String; + } else if (colTypeStr.equals(ColumnType.DATE_TYPE_NAME)) { + return FilterType.Date; + } else if (ColumnType.IntegralTypes.contains(colTypeStr)) { + return FilterType.Integral; + } + return FilterType.Invalid; + } + + public static FilterType fromClass(Object value) { + if (value instanceof String) { + return FilterType.String; + } else if (value instanceof Long) { + return FilterType.Integral; + } else if (value instanceof java.sql.Date) { + return FilterType.Date; + } + return FilterType.Invalid; + } + } + + @Override + public void visit(LeafNode node) throws MetaException { + if (node.operator == Operator.LIKE) { + filterBuffer.setError("LIKE is not supported for SQL filter pushdown"); + return; + } + int partColCount = table.getPartitionKeys().size(); + int partColIndex = node.getPartColIndexForFilter(table, filterBuffer); + if (filterBuffer.hasError()) return; + + // We skipped 'like', other ops should all work as long as the types are right. + String colTypeStr = table.getPartitionKeys().get(partColIndex).getType(); + FilterType colType = FilterType.fromType(colTypeStr); + if (colType == FilterType.Invalid) { + filterBuffer.setError("Filter pushdown not supported for type " + colTypeStr); + return; + } + FilterType valType = FilterType.fromClass(node.value); + Object nodeValue = node.value; + if (valType == FilterType.Invalid) { + filterBuffer.setError("Filter pushdown not supported for value " + node.value.getClass()); + return; + } + + // if Filter.g does date parsing for quoted strings, we'd need to verify there's no + // type mismatch when string col is filtered by a string that looks like date. + if (colType == FilterType.Date && valType == FilterType.String) { + // Filter.g cannot parse a quoted date; try to parse date here too. + try { + nodeValue = new java.sql.Date( + org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.PARTITION_DATE_FORMAT.get().parse((String)nodeValue).getTime()); + valType = FilterType.Date; + } catch (ParseException pe) { // do nothing, handled below - types will mismatch + } + } + + if (colType != valType) { + // It's not clear how filtering for e.g. "stringCol > 5" should work (which side is + // to be coerced?). Let the expression evaluation sort this one out, not metastore. + filterBuffer.setError("Cannot push down filter for " + + colTypeStr + " column and value " + nodeValue.getClass()); + return; + } + + if (joins.isEmpty()) { + // There's a fixed number of partition cols that we might have filters on. To avoid + // joining multiple times for one column (if there are several filters on it), we will + // keep numCols elements in the list, one for each column; we will fill it with nulls, + // put each join at a corresponding index when necessary, and remove nulls in the end. + for (int i = 0; i < partColCount; ++i) { + joins.add(null); + } + } + if (joins.get(partColIndex) == null) { + joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"FILTER" + partColIndex + + "\" on \"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex); + } + + // Build the filter and add parameters linearly; we are traversing leaf nodes LTR. + String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\""; + + if (node.isReverseOrder) { + params.add(nodeValue); + } + String tableColumn = tableValue; + if (colType != FilterType.String) { + // The underlying database field is varchar, we need to compare numbers. + if (colType == FilterType.Integral) { + tableValue = "cast(" + tableValue + " as decimal(21,0))"; + } else if (colType == FilterType.Date) { + if (dbType == DatabaseProduct.ORACLE) { + // Oracle requires special treatment... as usual. + tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')"; + } else { + tableValue = "cast(" + tableValue + " as date)"; + } + } + + // Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now. + String tableValue0 = tableValue; + tableValue = "(case when " + tableColumn + " <> ?"; + params.add(defaultPartName); + + if (dbHasJoinCastBug) { + // This is a workaround for DERBY-6358 and Oracle bug; it is pretty horrible. + tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + + "\"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and " + + "\"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex); + params.add(table.getTableName().toLowerCase()); + params.add(table.getDbName().toLowerCase()); + } + tableValue += " then " + tableValue0 + " else null end)"; + } + if (!node.isReverseOrder) { + params.add(nodeValue); + } + + filterBuffer.append(node.isReverseOrder + ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")" + : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)"); + } + } + + /** + * Retrieve the column statistics for the specified columns of the table. NULL + * is returned if the columns are not provided. + * @param dbName the database name of the table + * @param tableName the table name + * @param colNames the list of the column names + * @return the column statistics for the specified columns + * @throws MetaException + */ + public ColumnStatistics getTableStats(final String dbName, final String tableName, + List<String> colNames, boolean enableBitVector) throws MetaException { + if (colNames == null || colNames.isEmpty()) { + return null; + } + final boolean doTrace = LOG.isDebugEnabled(); + final String queryText0 = "select " + getStatsList(enableBitVector) + " from " + TAB_COL_STATS + " " + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in ("; + Batchable<String, Object[]> b = new Batchable<String, Object[]>() { + @Override + public List<Object[]> run(List<String> input) throws MetaException { + String queryText = queryText0 + makeParams(input.size()) + ")"; + Object[] params = new Object[input.size() + 2]; + params[0] = dbName; + params[1] = tableName; + for (int i = 0; i < input.size(); ++i) { + params[i + 2] = input.get(i); + } + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + Object qResult = executeWithArray(query, params, queryText); + timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0)); + if (qResult == null) { + query.closeAll(); + return null; + } + addQueryAfterUse(query); + return ensureList(qResult); + } + }; + List<Object[]> list = runBatched(colNames, b); + if (list.isEmpty()) { + return null; + } + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + ColumnStatistics result = makeColumnStats(list, csd, 0); + b.closeAllQueries(); + return result; + } + + public AggrStats aggrColStatsForPartitions(String dbName, String tableName, + List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation, + double ndvTuner, boolean enableBitVector) throws MetaException { + if (colNames.isEmpty() || partNames.isEmpty()) { + LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval"); + return new AggrStats(Collections.<ColumnStatisticsObj>emptyList(), 0); // Nothing to aggregate + } + long partsFound = 0; + List<ColumnStatisticsObj> colStatsList; + // Try to read from the cache first + if (isAggregateStatsCacheEnabled + && (partNames.size() < aggrStatsCache.getMaxPartsPerCacheNode())) { + AggrColStats colStatsAggrCached; + List<ColumnStatisticsObj> colStatsAggrFromDB; + int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); + double fpp = aggrStatsCache.getFalsePositiveProbability(); + colStatsList = new ArrayList<ColumnStatisticsObj>(); + // Bloom filter for the new node that we will eventually add to the cache + BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames); + boolean computePartsFound = true; + for (String colName : colNames) { + // Check the cache first + colStatsAggrCached = aggrStatsCache.get(dbName, tableName, colName, partNames); + if (colStatsAggrCached != null) { + colStatsList.add(colStatsAggrCached.getColStats()); + partsFound = colStatsAggrCached.getNumPartsCached(); + } else { + if (computePartsFound) { + partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames); + computePartsFound = false; + } + List<String> colNamesForDB = new ArrayList<String>(); + colNamesForDB.add(colName); + // Read aggregated stats for one column + colStatsAggrFromDB = + columnStatisticsObjForPartitions(dbName, tableName, partNames, colNamesForDB, + partsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); + if (!colStatsAggrFromDB.isEmpty()) { + ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0); + colStatsList.add(colStatsAggr); + // Update the cache to add this new aggregate node + aggrStatsCache.add(dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter); + } + } + } + } else { + partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames); + colStatsList = + columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound, + useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); + } + LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation + + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = " + + Arrays.toString(colStatsList.toArray())); + return new AggrStats(colStatsList, partsFound); + } + + private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, double fpp, + List<String> partNames) { + BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp); + for (String partName : partNames) { + bloomFilter.add(partName.getBytes()); + } + return bloomFilter; + } + + private long partsFoundForPartitions(final String dbName, final String tableName, + final List<String> partNames, List<String> colNames) throws MetaException { + assert !colNames.isEmpty() && !partNames.isEmpty(); + final boolean doTrace = LOG.isDebugEnabled(); + final String queryText0 = "select count(\"COLUMN_NAME\") from " + PART_COL_STATS + "" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + + " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)" + + " group by \"PARTITION_NAME\""; + List<Long> allCounts = runBatched(colNames, new Batchable<String, Long>() { + @Override + public List<Long> run(final List<String> inputColName) throws MetaException { + return runBatched(partNames, new Batchable<String, Long>() { + @Override + public List<Long> run(List<String> inputPartNames) throws MetaException { + long partsFound = 0; + String queryText = String.format(queryText0, + makeParams(inputColName.size()), makeParams(inputPartNames.size())); + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + try { + Object qResult = executeWithArray(query, prepareParams( + dbName, tableName, inputPartNames, inputColName), queryText); + long end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult; + Iterator<?> iter = fqr.iterator(); + while (iter.hasNext()) { + if (extractSqlLong(iter.next()) == inputColName.size()) { + partsFound++; + } + } + return Lists.<Long>newArrayList(partsFound); + } finally { + query.closeAll(); + } + } + }); + } + }); + long partsFound = 0; + for (Long val : allCounts) { + partsFound += val; + } + return partsFound; + } + + private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(final String dbName, + final String tableName, final List<String> partNames, List<String> colNames, long partsFound, + final boolean useDensityFunctionForNDVEstimation, final double ndvTuner, final boolean enableBitVector) throws MetaException { + final boolean areAllPartsFound = (partsFound == partNames.size()); + return runBatched(colNames, new Batchable<String, ColumnStatisticsObj>() { + @Override + public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException { + return runBatched(partNames, new Batchable<String, ColumnStatisticsObj>() { + @Override + public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException { + return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames, + inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector); + } + }); + } + }); + } + + // Get aggregated column stats for a table per partition for all columns in the partition + // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, + String tblName, boolean enableBitVector) throws MetaException { + String queryText = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " + + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + + " order by \"PARTITION_NAME\""; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, prepareParams(dbName, tblName, + Collections.<String>emptyList(), Collections.<String>emptyList()), queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyMap(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List<Object[]> list = ensureList(qResult); + Map<String, List<ColumnStatisticsObj>> partColStatsMap = + new HashMap<String, List<ColumnStatisticsObj>>(); + String partNameCurrent = null; + List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>(); + for (Object[] row : list) { + String partName = (String) row[0]; + if (partNameCurrent == null) { + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList<ColumnStatisticsObj>(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else if (!partNameCurrent.equalsIgnoreCase(partName)) { + // Save the previous partition and its col stat list + partColStatsMap.put(partNameCurrent, partColStatsList); + // Update the current partition we are working on + partNameCurrent = partName; + // Create a new list for this new partition + partColStatsList = new ArrayList<ColumnStatisticsObj>(); + // Add the col stat for the current column + partColStatsList.add(prepareCSObj(row, 1)); + } else { + partColStatsList.add(prepareCSObj(row, 1)); + } + Deadline.checkTimeout(); + } + query.closeAll(); + return partColStatsMap; + } + + /** Should be called with the list short enough to not trip up Oracle/etc. */ + private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String dbName, + String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound, + boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) throws MetaException { + if(enableBitVector) { + return aggrStatsUseJava(dbName, tableName, partNames, colNames, useDensityFunctionForNDVEstimation, ndvTuner); + } + else { + return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); + } + } + + private List<ColumnStatisticsObj> aggrStatsUseJava(String dbName, String tableName, + List<String> partNames, List<String> colNames, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + // 1. get all the stats for colNames in partNames; + List<ColumnStatistics> partStats = getPartitionStats(dbName, tableName, partNames, colNames, + true); + // 2. use util function to aggr stats + return MetaStoreUtils.aggrPartitionStats(partStats, dbName, tableName, partNames, colNames, + useDensityFunctionForNDVEstimation, ndvTuner); + } + + private List<ColumnStatisticsObj> aggrStatsUseDB(String dbName, + String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + // TODO: all the extrapolation logic should be moved out of this class, + // only mechanical data retrieval should remain here. + String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " + + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " + // The following data is used to compute a partitioned table's NDV based + // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be + // accurately derived from partition NDVs, because the domain of column value two partitions + // can overlap. If there is no overlap then global NDV is just the sum + // of partition NDVs (UpperBound). But if there is some overlay then + // global NDV can be anywhere between sum of partition NDVs (no overlap) + // and same as one of the partition NDV (domain of column value in all other + // partitions is subset of the domain value in one of the partition) + // (LowerBound).But under uniform distribution, we can roughly estimate the global + // NDV by leveraging the min/max values. + // And, we also guarantee that the estimation makes sense by comparing it to the + // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") + // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "; + String queryText = null; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + ForwardQueryResult<?> fqr = null; + // Check if the status of all the columns of all the partitions exists + // Extrapolation is not needed. + if (areAllPartsFound) { + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), + queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List<Object[]> list = ensureList(qResult); + List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size()); + for (Object[] row : list) { + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); + Deadline.checkTimeout(); + } + query.closeAll(); + return colStats; + } else { + // Extrapolation is needed for some columns. + // In this case, at least a column status for a partition is missing. + // We need to extrapolate this partition based on the other partitions + List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size()); + queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") " + + " from " + PART_COL_STATS + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, prepareParams(dbName, tableName, partNames, colNames), + queryText); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + List<String> noExtraColumnNames = new ArrayList<String>(); + Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>(); + List<Object[]> list = ensureList(qResult); + for (Object[] row : list) { + String colName = (String) row[0]; + String colType = (String) row[1]; + // Extrapolation is not needed for this column if + // count(\"PARTITION_NAME\")==partNames.size() + // Or, extrapolation is not possible for this column if + // count(\"PARTITION_NAME\")<2 + Long count = extractSqlLong(row[2]); + if (count == partNames.size() || count < 2) { + noExtraColumnNames.add(colName); + } else { + extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) }); + } + Deadline.checkTimeout(); + } + query.closeAll(); + // Extrapolation is not needed for columns noExtraColumnNames + if (noExtraColumnNames.size() != 0) { + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, noExtraColumnNames), queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + list = ensureList(qResult); + for (Object[] row : list) { + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); + Deadline.checkTimeout(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); + } + // Extrapolation is needed for extraColumnNames. + // give a sequence number for all the partitions + if (extraColumnNameTypeParts.size() != 0) { + Map<String, Integer> indexMap = new HashMap<String, Integer>(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + // get sum for all columns to reduce the number of queries + Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>(); + queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" + + " from " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? " + + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + + ") group by \"COLUMN_NAME\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + List<String> extraColumnNames = new ArrayList<String>(); + extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, extraColumnNames), queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + list = ensureList(qResult); + // see the indexes for colstats in IExtrapolatePartStatus + Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 }; + for (Object[] row : list) { + Map<Integer, Object> indexToObject = new HashMap<Integer, Object>(); + for (int ind = 1; ind < row.length; ind++) { + indexToObject.put(sumIndex[ind - 1], row[ind]); + } + // row[0] is the column name + sumMap.put((String) row[0], indexToObject); + Deadline.checkTimeout(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); + for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) { + Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; + String colName = entry.getKey(); + String colType = entry.getValue()[0]; + Long sumVal = Long.parseLong(entry.getValue()[1]); + // fill in colname + row[0] = colName; + // fill in coltype + row[1] = colType; + // use linear extrapolation. more complicated one can be added in the + // future. + IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); + // fill in colstatus + Integer[] index = null; + boolean decimal = false; + if (colType.toLowerCase().startsWith("decimal")) { + index = IExtrapolatePartStatus.indexMaps.get("decimal"); + decimal = true; + } else { + index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); + } + // if the colType is not the known type, long, double, etc, then get + // all index. + if (index == null) { + index = IExtrapolatePartStatus.indexMaps.get("default"); + } + for (int colStatIndex : index) { + String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; + // if the aggregation type is sum, we do a scale-up + if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { + Object o = sumMap.get(colName).get(colStatIndex); + if (o == null) { + row[2 + colStatIndex] = null; + } else { + Long val = extractSqlLong(o); + row[2 + colStatIndex] = val / sumVal * (partNames.size()); + } + } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min + || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { + // if the aggregation type is min/max, we extrapolate from the + // left/right borders + if (!decimal) { + queryText = "select \"" + colStatName + + "\",\"PARTITION_NAME\" from " + PART_COL_STATS + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " order by \"" + colStatName + "\""; + } else { + queryText = "select \"" + colStatName + + "\",\"PARTITION_NAME\" from " + PART_COL_STATS + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?" + + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")" + + " order by cast(\"" + colStatName + "\" as decimal)"; + } + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + fqr = (ForwardQueryResult<?>) qResult; + Object[] min = (Object[]) (fqr.get(0)); + Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); + if (min[0] == null || max[0] == null) { + row[2 + colStatIndex] = null; + } else { + row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, + indexMap); + } + } else { + // if the aggregation type is avg, we use the average on the existing ones. + queryText = "select " + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" + + " from " + PART_COL_STATS + "" + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\""; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tableName, partNames, Arrays.asList(colName)), queryText); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + fqr = (ForwardQueryResult<?>) qResult; + Object[] avg = (Object[]) (fqr.get(0)); + // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", + // "AVG_DECIMAL" + row[2 + colStatIndex] = avg[colStatIndex - 12]; + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + query.closeAll(); + } + } + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); + Deadline.checkTimeout(); + } + } + return colStats; + } + } + + private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { + ColumnStatisticsData data = new ColumnStatisticsData(); + ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); + Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], + declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], bitVector = row[i++], + avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++]; + StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, + llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, avglen, maxlen, trues, falses); + return cso; + } + + private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + ColumnStatisticsData data = new ColumnStatisticsData(); + ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data); + Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], avgDecimal = row[i++], sumDist = row[i++]; + StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, + declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, + avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner); + return cso; + } + + private Object[] prepareParams(String dbName, String tableName, List<String> partNames, + List<String> colNames) throws MetaException { + + Object[] params = new Object[colNames.size() + partNames.size() + 2]; + int paramI = 0; + params[paramI++] = dbName; + params[paramI++] = tableName; + for (String colName : colNames) { + params[paramI++] = colName; + } + for (String partName : partNames) { + params[paramI++] = partName; + } + + return params; + } + + public List<ColumnStatistics> getPartitionStats(final String dbName, final String tableName, + final List<String> partNames, List<String> colNames, boolean enableBitVector) throws MetaException { + if (colNames.isEmpty() || partNames.isEmpty()) { + return Collections.emptyList(); + } + final boolean doTrace = LOG.isDebugEnabled(); + final String queryText0 = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " + + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\"" + + " in (%1$s) AND \"PARTITION_NAME\" in (%2$s) order by \"PARTITION_NAME\""; + Batchable<String, Object[]> b = new Batchable<String, Object[]>() { + @Override + public List<Object[]> run(final List<String> inputColNames) throws MetaException { + Batchable<String, Object[]> b2 = new Batchable<String, Object[]>() { + @Override + public List<Object[]> run(List<String> inputPartNames) throws MetaException { + String queryText = String.format(queryText0, + makeParams(inputColNames.size()), makeParams(inputPartNames.size())); + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + Object qResult = executeWithArray(query, prepareParams( + dbName, tableName, inputPartNames, inputColNames), queryText); + timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0)); + if (qResult == null) { + query.closeAll(); + return Collections.emptyList(); + } + addQueryAfterUse(query); + return ensureList(qResult); + } + }; + try { + return runBatched(partNames, b2); + } finally { + addQueryAfterUse(b2); + } + } + }; + List<Object[]> list = runBatched(colNames, b); + + List<ColumnStatistics> result = new ArrayList<ColumnStatistics>( + Math.min(list.size(), partNames.size())); + String lastPartName = null; + int from = 0; + for (int i = 0; i <= list.size(); ++i) { + boolean isLast = i == list.size(); + String partName = isLast ? null : (String)list.get(i)[0]; + if (!isLast && partName.equals(lastPartName)) { + continue; + } else if (from != i) { + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false, dbName, tableName); + csd.setPartName(lastPartName); + result.add(makeColumnStats(list.subList(from, i), csd, 1)); + } + lastPartName = partName; + from = i; + Deadline.checkTimeout(); + } + b.closeAllQueries(); + return result; + } + + /** The common query part for table and partition stats */ + private final String getStatsList(boolean enableBitVector) { + return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\", " + + "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", \"BIG_DECIMAL_LOW_VALUE\", " + + "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", " + + (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ") + "\"AVG_COL_LEN\", " + + "\"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\" "; + } + + private ColumnStatistics makeColumnStats( + List<Object[]> list, ColumnStatisticsDesc csd, int offset) throws MetaException { + ColumnStatistics result = new ColumnStatistics(); + result.setStatsDesc(csd); + List<ColumnStatisticsObj> csos = new ArrayList<ColumnStatisticsObj>(list.size()); + for (Object[] row : list) { + // LastAnalyzed is stored per column but thrift has it per several; + // get the lowest for now as nobody actually uses this field. + Object laObj = row[offset + 15]; + if (laObj != null && (!csd.isSetL
<TRUNCATED>
