http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java deleted file mode 100644 index 27d25e1..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java +++ /dev/null @@ -1,1268 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.UUID; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.CachePoolEntry; -import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FunctionType; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.ResourceType; -import org.apache.hadoop.hive.metastore.api.ResourceUri; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.ql.exec.FunctionUtils; -import org.apache.log4j.Logger; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.TException; - -import com.cloudera.impala.analysis.TableName; -import com.cloudera.impala.authorization.SentryConfig; -import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient; -import com.cloudera.impala.common.FileSystemUtil; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.common.Pair; -import com.cloudera.impala.hive.executor.UdfExecutor; -import com.cloudera.impala.thrift.TCatalog; -import com.cloudera.impala.thrift.TCatalogObject; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TFunction; -import com.cloudera.impala.thrift.TFunctionBinaryType; -import com.cloudera.impala.thrift.TGetAllCatalogObjectsResponse; -import com.cloudera.impala.thrift.TPartitionKeyValue; -import com.cloudera.impala.thrift.TPrivilege; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TTableName; -import com.cloudera.impala.thrift.TUniqueId; -import com.cloudera.impala.util.PatternMatcher; -import com.cloudera.impala.util.SentryProxy; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.io.Files; - -/** - * Specialized Catalog that implements the CatalogService specific Catalog - * APIs. The CatalogServiceCatalog manages loading of all the catalog metadata - * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog - * will return the catalog version that the update will show up in. The client - * can then wait until the statestore sends an update that contains that catalog - * version. - * The CatalogServiceCatalog also manages a global "catalog version". The version - * is incremented and assigned to a CatalogObject whenever it is - * added/modified/removed from the catalog. This means each CatalogObject will have a - * unique version and assigned versions are strictly increasing. - * - * Table metadata for IncompleteTables (not fully loaded tables) are loaded in the - * background by the TableLoadingMgr; tables can be prioritized for loading by calling - * prioritizeLoad(). Background loading can also be enabled for the catalog, in which - * case missing tables (tables that are not yet loaded) are submitted to the - * TableLoadingMgr any table metadata is invalidated and on startup. The metadata of - * fully loaded tables (e.g. HdfsTable, HBaseTable, etc) are updated in-place and don't - * trigger a background metadata load through the TableLoadingMgr. Accessing a table - * that is not yet loaded (via getTable()), will load the table's metadata on-demand, - * out-of-band of the table loading thread pool. - * - * See the class comments in CatalogOpExecutor for a description of the locking protocol - * that should be employed if both the catalog lock and table locks need to be held at - * the same time. - * - * TODO: Consider removing on-demand loading and have everything go through the table - * loading thread pool. - */ -public class CatalogServiceCatalog extends Catalog { - private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class); - - private final TUniqueId catalogServiceId_; - - // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock - // protects catalogVersion_, it can be used to perform atomic bulk catalog operations - // since catalogVersion_ cannot change externally while the lock is being held. - // In addition to protecting catalogVersion_, it is currently used for the - // following bulk operations: - // * Building a delta update to send to the statestore in getCatalogObjects(), - // so a snapshot of the catalog can be taken without any version changes. - // * During a catalog invalidation (call to reset()), which re-reads all dbs and tables - // from the metastore. - // * During renameTable(), because a table must be removed and added to the catalog - // atomically (potentially in a different database). - private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true); - - // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented - // with each update to the Catalog. Continued across the lifetime of the object. - // Protected by catalogLock_. - // TODO: Handle overflow of catalogVersion_ and nextTableId_. - // TODO: The name of this variable is misleading and can be interpreted as a property - // of the catalog server. Rename into something that indicates its role as a global - // sequence number assigned to catalog objects. - private long catalogVersion_ = INITIAL_CATALOG_VERSION; - - protected final AtomicInteger nextTableId_ = new AtomicInteger(0); - - // Manages the scheduling of background table loading. - private final TableLoadingMgr tableLoadingMgr_; - - private final boolean loadInBackground_; - - // Periodically polls HDFS to get the latest set of known cache pools. - private final ScheduledExecutorService cachePoolReader_ = - Executors.newScheduledThreadPool(1); - - // Proxy to access the Sentry Service and also periodically refreshes the - // policy metadata. Null if Sentry Service is not enabled. - private final SentryProxy sentryProxy_; - - // Local temporary directory to copy UDF Jars. - private static final String LOCAL_LIBRARY_PATH = new String("file://" + - System.getProperty("java.io.tmpdir")); - - /** - * Initialize the CatalogServiceCatalog. If loadInBackground is true, table metadata - * will be loaded in the background - */ - public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads, - SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal) { - super(true); - catalogServiceId_ = catalogServiceId; - tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); - loadInBackground_ = loadInBackground; - try { - // We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3, - // local, etc.) - if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) { - cachePoolReader_.scheduleAtFixedRate( - new CachePoolReader(), 0, 1, TimeUnit.MINUTES); - } - } catch (IOException e) { - LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled."); - } - if (sentryConfig != null) { - sentryProxy_ = new SentryProxy(sentryConfig, this, kerberosPrincipal); - } else { - sentryProxy_ = null; - } - } - - /** - * Reads the current set of cache pools from HDFS and updates the catalog. - * Called periodically by the cachePoolReader_. - */ - protected class CachePoolReader implements Runnable { - - /** - * This constructor is needed to create a non-threaded execution of the class. - */ - public CachePoolReader() { - super(); - } - - public void run() { - LOG.trace("Reloading cache pool names from HDFS"); - // Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations - // to be performed on the keys. - Map<String, CachePoolInfo> currentCachePools = Maps.newHashMap(); - try { - DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem(); - RemoteIterator<CachePoolEntry> itr = dfs.listCachePools(); - while (itr.hasNext()) { - CachePoolInfo cachePoolInfo = itr.next().getInfo(); - currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo); - } - } catch (Exception e) { - LOG.error("Error loading cache pools: ", e); - return; - } - - catalogLock_.writeLock().lock(); - try { - // Determine what has changed relative to what we have cached. - Set<String> droppedCachePoolNames = Sets.difference( - hdfsCachePools_.keySet(), currentCachePools.keySet()); - Set<String> createdCachePoolNames = Sets.difference( - currentCachePools.keySet(), hdfsCachePools_.keySet()); - // Add all new cache pools. - for (String createdCachePool: createdCachePoolNames) { - HdfsCachePool cachePool = new HdfsCachePool( - currentCachePools.get(createdCachePool)); - cachePool.setCatalogVersion( - CatalogServiceCatalog.this.incrementAndGetCatalogVersion()); - hdfsCachePools_.add(cachePool); - } - // Remove dropped cache pools. - for (String cachePoolName: droppedCachePoolNames) { - hdfsCachePools_.remove(cachePoolName); - CatalogServiceCatalog.this.incrementAndGetCatalogVersion(); - } - } finally { - catalogLock_.writeLock().unlock(); - } - } - } - - /** - * Adds a list of cache directive IDs for the given table name. Asynchronously - * refreshes the table metadata once all cache directives complete. - */ - public void watchCacheDirs(List<Long> dirIds, TTableName tblName) { - tableLoadingMgr_.watchCacheDirs(dirIds, tblName); - } - - /** - * Prioritizes the loading of the given list TCatalogObjects. Currently only support - * loading Table/View metadata since Db and Function metadata is not loaded lazily. - */ - public void prioritizeLoad(List<TCatalogObject> objectDescs) { - for (TCatalogObject catalogObject: objectDescs) { - Preconditions.checkState(catalogObject.isSetTable()); - TTable table = catalogObject.getTable(); - tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(), - table.getTbl_name().toLowerCase())); - } - } - - /** - * Returns all known objects in the Catalog (Tables, Views, Databases, and - * Functions). Some metadata may be skipped for objects that have a catalog - * version < the specified "fromVersion". Takes a lock on the catalog to ensure this - * update contains a consistent snapshot of all items in the catalog. While holding the - * catalog lock, it locks each accessed table to protect against concurrent - * modifications. - */ - public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) { - TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse(); - resp.setObjects(new ArrayList<TCatalogObject>()); - resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION); - catalogLock_.readLock().lock(); - try { - for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) { - TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE, - db.getCatalogVersion()); - catalogDb.setDb(db.toThrift()); - resp.addToObjects(catalogDb); - - for (String tblName: db.getAllTableNames()) { - TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE, - Catalog.INITIAL_CATALOG_VERSION); - - Table tbl = db.getTable(tblName); - if (tbl == null) { - LOG.error("Table: " + tblName + " was expected to be in the catalog " + - "cache. Skipping table for this update."); - continue; - } - - // Protect the table from concurrent modifications. - synchronized(tbl) { - // Only add the extended metadata if this table's version is >= - // the fromVersion. - if (tbl.getCatalogVersion() >= fromVersion) { - try { - catalogTbl.setTable(tbl.toThrift()); - } catch (Exception e) { - LOG.debug(String.format("Error calling toThrift() on table %s.%s: %s", - db.getName(), tblName, e.getMessage()), e); - continue; - } - catalogTbl.setCatalog_version(tbl.getCatalogVersion()); - } else { - catalogTbl.setTable(new TTable(db.getName(), tblName)); - } - } - resp.addToObjects(catalogTbl); - } - - for (Function fn: db.getFunctions(null, new PatternMatcher())) { - TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION, - fn.getCatalogVersion()); - function.setFn(fn.toThrift()); - resp.addToObjects(function); - } - } - - for (DataSource dataSource: getDataSources()) { - TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE, - dataSource.getCatalogVersion()); - catalogObj.setData_source(dataSource.toThrift()); - resp.addToObjects(catalogObj); - } - for (HdfsCachePool cachePool: hdfsCachePools_) { - TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, - cachePool.getCatalogVersion()); - pool.setCache_pool(cachePool.toThrift()); - resp.addToObjects(pool); - } - - // Get all roles - for (Role role: authPolicy_.getAllRoles()) { - TCatalogObject thriftRole = new TCatalogObject(); - thriftRole.setRole(role.toThrift()); - thriftRole.setCatalog_version(role.getCatalogVersion()); - thriftRole.setType(role.getCatalogObjectType()); - resp.addToObjects(thriftRole); - - for (RolePrivilege p: role.getPrivileges()) { - TCatalogObject privilege = new TCatalogObject(); - privilege.setPrivilege(p.toThrift()); - privilege.setCatalog_version(p.getCatalogVersion()); - privilege.setType(p.getCatalogObjectType()); - resp.addToObjects(privilege); - } - } - - // Each update should contain a single "TCatalog" object which is used to - // pass overall state on the catalog, such as the current version and the - // catalog service id. - TCatalogObject catalog = new TCatalogObject(); - catalog.setType(TCatalogObjectType.CATALOG); - // By setting the catalog version to the latest catalog version at this point, - // it ensure impalads will always bump their versions, even in the case where - // an object has been dropped. - catalog.setCatalog_version(getCatalogVersion()); - catalog.setCatalog(new TCatalog(catalogServiceId_)); - resp.addToObjects(catalog); - - // The max version is the max catalog version of all items in the update. - resp.setMax_catalog_version(getCatalogVersion()); - return resp; - } finally { - catalogLock_.readLock().unlock(); - } - } - - /** - * Returns all user defined functions (aggregate and scalar) in the specified database. - * Functions are not returned in a defined order. - */ - public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database does not exist: " + dbName); - } - - // Contains map of overloaded function names to all functions matching that name. - HashMap<String, List<Function>> dbFns = db.getAllFunctions(); - List<Function> fns = new ArrayList<Function>(dbFns.size()); - for (List<Function> fnOverloads: dbFns.values()) { - for (Function fn: fnOverloads) { - fns.add(fn); - } - } - return fns; - } - - /** - * Checks if the Hive function 'fn' is Impala compatible. A function is Impala - * compatible iff - * - * 1. The function is JAVA based, - * 2. Has exactly one binary resource associated (We don't support loading - * dependencies yet) and - * 3. The binary is of type JAR. - * - * Returns true if compatible and false otherwise. In case of incompatible - * functions 'incompatMsg' has the reason for the incompatibility. - * */ - public static boolean isFunctionCompatible( - org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) { - boolean isCompatible = true; - if (fn.getFunctionType() != FunctionType.JAVA) { - isCompatible = false; - incompatMsg.append("Function type: " + fn.getFunctionType().name() - + " is not supported. Only " + FunctionType.JAVA.name() + " functions " - + "are supported."); - } else if (fn.getResourceUrisSize() == 0) { - isCompatible = false; - incompatMsg.append("No executable binary resource (like a JAR file) is " + - "associated with this function. To fix this, recreate the function by " + - "specifying a 'location' in the function create statement."); - } else if (fn.getResourceUrisSize() != 1) { - isCompatible = false; - List<String> resourceUris = Lists.newArrayList(); - for (ResourceUri resource: fn.getResourceUris()) { - resourceUris.add(resource.getUri()); - } - incompatMsg.append("Impala does not support multiple Jars for dependencies." - + "(" + Joiner.on(",").join(resourceUris) + ") "); - } else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) { - isCompatible = false; - incompatMsg.append("Function binary type: " + - fn.getResourceUris().get(0).getResourceType().name() - + " is not supported. Only " + ResourceType.JAR.name() - + " type is supported."); - } - return isCompatible; - } - - /** - * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF - * class referred to by the given Java function. This method copies the UDF Jar - * referenced by "function" to a temporary file in "LOCAL_LIBRARY_PATH" and loads it - * into the jvm. Then we scan all the methods in the class using reflection and extract - * those methods and create corresponding Impala functions. Currently Impala supports - * only "JAR" files for symbols and also a single Jar containing all the dependent - * classes rather than a set of Jar files. - */ - public static List<Function> extractFunctions(String db, - org.apache.hadoop.hive.metastore.api.Function function) - throws ImpalaRuntimeException{ - List<Function> result = Lists.newArrayList(); - List<String> addedSignatures = Lists.newArrayList(); - StringBuilder warnMessage = new StringBuilder(); - if (!isFunctionCompatible(function, warnMessage)) { - LOG.warn("Skipping load of incompatible function: " + - function.getFunctionName() + ". " + warnMessage.toString()); - return result; - } - String jarUri = function.getResourceUris().get(0).getUri(); - Class<?> udfClass = null; - try { - Path localJarPath = new Path(LOCAL_LIBRARY_PATH, - UUID.randomUUID().toString() + ".jar"); - try { - FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath); - } catch (IOException e) { - String errorMsg = "Error loading Java function: " + db + "." + - function.getFunctionName() + ". Couldn't copy " + jarUri + - " to local path: " + localJarPath.toString(); - LOG.error(errorMsg, e); - throw new ImpalaRuntimeException(errorMsg); - } - URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())}; - URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls); - udfClass = urlClassLoader.loadClass(function.getClassName()); - // Check if the class is of UDF type. Currently we don't support other functions - // TODO: Remove this once we support Java UDAF/UDTF - if (FunctionUtils.getUDFClassType(udfClass) != FunctionUtils.UDFClassType.UDF) { - LOG.warn("Ignoring load of incompatible Java function: " + - function.getFunctionName() + " as " + FunctionUtils.getUDFClassType(udfClass) - + " is not a supported type. Only UDFs are supported"); - return result; - } - // Load each method in the UDF class and create the corresponding Impala Function - // object. - for (Method m: udfClass.getMethods()) { - if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue; - Function fn = ScalarFunction.fromHiveFunction(db, - function.getFunctionName(), function.getClassName(), - m.getParameterTypes(), m.getReturnType(), jarUri); - if (fn == null) { - LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of " + - "Hive UDF:" + function.getFunctionName() + " from " + udfClass); - continue; - } - if (!addedSignatures.contains(fn.signatureString())) { - result.add(fn); - addedSignatures.add(fn.signatureString()); - } - } - } catch (ClassNotFoundException c) { - String errorMsg = "Error loading Java function: " + db + "." + - function.getFunctionName() + ". Symbol class " + udfClass + - "not found in Jar: " + jarUri; - LOG.error(errorMsg); - throw new ImpalaRuntimeException(errorMsg, c); - } catch (Exception e) { - LOG.error("Skipping function load: " + function.getFunctionName(), e); - throw new ImpalaRuntimeException("Error extracting functions", e); - } catch (LinkageError e) { - String errorMsg = "Error resolving dependencies for Java function: " + db + "." + - function.getFunctionName(); - LOG.error(errorMsg); - throw new ImpalaRuntimeException(errorMsg, e); - } - return result; - } - - /** - * Extracts Impala functions stored in metastore db parameters and adds them to - * the catalog cache. - */ - private void loadFunctionsFromDbParams(Db db, - org.apache.hadoop.hive.metastore.api.Database msDb) { - if (msDb == null || msDb.getParameters() == null) return; - LOG.info("Loading native functions for database: " + db.getName()); - TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory(); - for (String key: msDb.getParameters().keySet()) { - if (!key.startsWith(Db.FUNCTION_INDEX_PREFIX)) continue; - try { - TFunction fn = new TFunction(); - JniUtil.deserializeThrift(protocolFactory, fn, - Base64.decodeBase64(msDb.getParameters().get(key))); - Function addFn = Function.fromThrift(fn); - db.addFunction(addFn, false); - addFn.setCatalogVersion(incrementAndGetCatalogVersion()); - } catch (ImpalaException e) { - LOG.error("Encountered an error during function load: key=" + key - + ",continuing", e); - } - } - } - - /** - * Loads Java functions into the catalog. For each function in "functions", - * we extract all Impala compatible evaluate() signatures and load them - * as separate functions in the catalog. - */ - private void loadJavaFunctions(Db db, - List<org.apache.hadoop.hive.metastore.api.Function> functions) { - Preconditions.checkNotNull(functions); - LOG.info("Loading Java functions for database: " + db.getName()); - for (org.apache.hadoop.hive.metastore.api.Function function: functions) { - try { - for (Function fn: extractFunctions(db.getName(), function)) { - db.addFunction(fn); - fn.setCatalogVersion(incrementAndGetCatalogVersion()); - } - } catch (Exception e) { - LOG.error("Skipping function load: " + function.getFunctionName(), e); - } - } - } - - /** - * Invalidates the database 'db'. This method can have potential race - * conditions with external changes to the Hive metastore and hence any - * conflicting changes to the objects can manifest in the form of exceptions - * from the HMS calls which are appropriately handled. Returns the invalidated - * 'Db' object along with list of tables to be loaded by the TableLoadingMgr. - * Returns null if the method encounters an exception during invalidation. - */ - private Pair<Db, List<TTableName>> invalidateDb( - MetaStoreClient msClient, String dbName, Db existingDb) { - try { - List<org.apache.hadoop.hive.metastore.api.Function> javaFns = - Lists.newArrayList(); - for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) { - javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn)); - } - org.apache.hadoop.hive.metastore.api.Database msDb = - msClient.getHiveClient().getDatabase(dbName); - Db newDb = new Db(dbName, this, msDb); - // existingDb is usually null when the Catalog loads for the first time. - // In that case we needn't restore any transient functions. - if (existingDb != null) { - // Restore UDFs that aren't persisted. They are only cleaned up on - // Catalog restart. - for (Function fn: existingDb.getTransientFunctions()) { - newDb.addFunction(fn); - fn.setCatalogVersion(incrementAndGetCatalogVersion()); - } - } - // Reload native UDFs. - loadFunctionsFromDbParams(newDb, msDb); - // Reload Java UDFs from HMS. - loadJavaFunctions(newDb, javaFns); - newDb.setCatalogVersion(incrementAndGetCatalogVersion()); - - List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); - for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { - Table incompleteTbl = IncompleteTable.createUninitializedTable( - getNextTableId(), newDb, tableName); - incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); - newDb.addTable(incompleteTbl); - if (loadInBackground_) { - tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase())); - } - } - return Pair.create(newDb, tblsToBackgroundLoad); - } catch (Exception e) { - LOG.warn("Encountered an exception while invalidating database: " + dbName + - ". Ignoring further load of this db.", e); - } - return null; - } - - /** - * Resets this catalog instance by clearing all cached table and database metadata. - */ - public void reset() throws CatalogException { - // First update the policy metadata. - if (sentryProxy_ != null) { - // Sentry Service is enabled. - try { - // Update the authorization policy, waiting for the result to complete. - sentryProxy_.refresh(); - } catch (Exception e) { - throw new CatalogException("Error updating authorization policy: ", e); - } - } - - catalogLock_.writeLock().lock(); - try { - nextTableId_.set(0); - - // Not all Java UDFs are persisted to the metastore. The ones which aren't - // should be restored once the catalog has been invalidated. - Map<String, Db> oldDbCache = dbCache_.get(); - - // Build a new DB cache, populate it, and replace the existing cache in one - // step. - ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>(); - List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); - try (MetaStoreClient msClient = getMetaStoreClient()) { - for (String dbName: msClient.getHiveClient().getAllDatabases()) { - dbName = dbName.toLowerCase(); - Db oldDb = oldDbCache.get(dbName); - Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient, - dbName, oldDb); - if (invalidatedDb == null) continue; - newDbCache.put(dbName, invalidatedDb.first); - tblsToBackgroundLoad.addAll(invalidatedDb.second); - } - } - dbCache_.set(newDbCache); - // Submit tables for background loading. - for (TTableName tblName: tblsToBackgroundLoad) { - tableLoadingMgr_.backgroundLoad(tblName); - } - } catch (Exception e) { - LOG.error(e); - throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e); - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Adds a database name to the metadata cache and returns the database's - * new Db object. Used by CREATE DATABASE statements. - */ - public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) - throws ImpalaException { - Db newDb = new Db(dbName, this, msDb); - newDb.setCatalogVersion(incrementAndGetCatalogVersion()); - addDb(newDb); - return newDb; - } - - /** - * Removes a database from the metadata cache and returns the removed database, - * or null if the database did not exist in the cache. - * Used by DROP DATABASE statements. - */ - @Override - public Db removeDb(String dbName) { - Db removedDb = super.removeDb(dbName); - if (removedDb != null) { - removedDb.setCatalogVersion(incrementAndGetCatalogVersion()); - } - return removedDb; - } - - /** - * Adds a table with the given name to the catalog and returns the new table, - * loading the metadata if needed. - */ - public Table addTable(String dbName, String tblName) throws TableNotFoundException { - Db db = getDb(dbName); - if (db == null) return null; - Table incompleteTable = - IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName); - incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(incompleteTable); - return db.getTable(tblName); - } - - /** - * Gets the table with the given name, loading it if needed (if the existing catalog - * object is not yet loaded). Returns the matching Table or null if no table with this - * name exists in the catalog. - * If the existing table is dropped or modified (indicated by the catalog version - * changing) while the load is in progress, the loaded value will be discarded - * and the current cached value will be returned. This may mean that a missing table - * (not yet loaded table) will be returned. - */ - public Table getOrLoadTable(String dbName, String tblName) - throws CatalogException { - TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase()); - TableLoadingMgr.LoadRequest loadReq; - - long previousCatalogVersion; - // Return the table if it is already loaded or submit a new load request. - catalogLock_.readLock().lock(); - try { - Table tbl = getTable(dbName, tblName); - if (tbl == null || tbl.isLoaded()) return tbl; - previousCatalogVersion = tbl.getCatalogVersion(); - loadReq = tableLoadingMgr_.loadAsync(tableName); - } finally { - catalogLock_.readLock().unlock(); - } - Preconditions.checkNotNull(loadReq); - try { - // The table may have been dropped/modified while the load was in progress, so only - // apply the update if the existing table hasn't changed. - return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion); - } finally { - loadReq.close(); - } - } - - /** - * Replaces an existing Table with a new value if it exists and has not changed - * (has the same catalog version as 'expectedCatalogVersion'). - */ - private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion) - throws DatabaseNotFoundException { - catalogLock_.writeLock().lock(); - try { - Db db = getDb(updatedTbl.getDb().getName()); - if (db == null) { - throw new DatabaseNotFoundException( - "Database does not exist: " + updatedTbl.getDb().getName()); - } - - Table existingTbl = db.getTable(updatedTbl.getName()); - // The existing table does not exist or has been modified. Instead of - // adding the loaded value, return the existing table. - if (existingTbl == null || - existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl; - - updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(updatedTbl); - return updatedTbl; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Removes a table from the catalog and increments the catalog version. - * Returns the removed Table, or null if the table or db does not exist. - */ - public Table removeTable(String dbName, String tblName) { - Db parentDb = getDb(dbName); - if (parentDb == null) return null; - - Table removedTable = parentDb.removeTable(tblName); - if (removedTable != null) { - removedTable.setCatalogVersion(incrementAndGetCatalogVersion()); - } - return removedTable; - } - - /** - * Removes a function from the catalog. Increments the catalog version and returns - * the Function object that was removed. If the function did not exist, null will - * be returned. - */ - @Override - public Function removeFunction(Function desc) { - Function removedFn = super.removeFunction(desc); - if (removedFn != null) { - removedFn.setCatalogVersion(incrementAndGetCatalogVersion()); - } - return removedFn; - } - - /** - * Adds a function from the catalog, incrementing the catalog version. Returns true if - * the add was successful, false otherwise. - */ - @Override - public boolean addFunction(Function fn) { - Db db = getDb(fn.getFunctionName().getDb()); - if (db == null) return false; - if (db.addFunction(fn)) { - fn.setCatalogVersion(incrementAndGetCatalogVersion()); - return true; - } - return false; - } - - /** - * Adds a data source to the catalog, incrementing the catalog version. Returns true - * if the add was successful, false otherwise. - */ - @Override - public boolean addDataSource(DataSource dataSource) { - if (dataSources_.add(dataSource)) { - dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); - return true; - } - return false; - } - - @Override - public DataSource removeDataSource(String dataSourceName) { - DataSource dataSource = dataSources_.remove(dataSourceName); - if (dataSource != null) { - dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); - } - return dataSource; - } - - /** - * Returns the table parameter 'transient_lastDdlTime', or -1 if it's not set. - * TODO: move this to a metastore helper class. - */ - public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) { - Preconditions.checkNotNull(msTbl); - Map<String, String> params = msTbl.getParameters(); - String lastDdlTimeStr = params.get("transient_lastDdlTime"); - if (lastDdlTimeStr != null) { - try { - return Long.parseLong(lastDdlTimeStr); - } catch (NumberFormatException e) {} - } - return -1; - } - - /** - * Updates the cached lastDdlTime for the given table. The lastDdlTime is used during - * the metadata refresh() operations to determine if there have been any external - * (outside of Impala) modifications to the table. - */ - public void updateLastDdlTime(TTableName tblName, long ddlTime) { - Db db = getDb(tblName.getDb_name()); - if (db == null) return; - Table tbl = db.getTable(tblName.getTable_name()); - if (tbl == null) return; - tbl.updateLastDdlTime(ddlTime); - } - - /** - * Renames a table. Equivalent to an atomic drop + add of the table. Returns - * the new Table object with an incremented catalog version or null if operation - * was not successful. - */ - public Table renameTable(TTableName oldTableName, TTableName newTableName) - throws CatalogException { - // Remove the old table name from the cache and add the new table. - Db db = getDb(oldTableName.getDb_name()); - if (db != null) db.removeTable(oldTableName.getTable_name()); - return addTable(newTableName.getDb_name(), newTableName.getTable_name()); - } - - /** - * Reloads metadata for table 'tbl'. If 'tbl' is an IncompleteTable, it makes an - * asynchronous request to the table loading manager to create a proper table instance - * and load the metadata from Hive Metastore. Otherwise, it updates table metadata - * in-place by calling the load() function on the specified table. Returns 'tbl', if it - * is a fully loaded table (e.g. HdfsTable, HBaseTable, etc). Otherwise, returns a - * newly constructed fully loaded table. Applies proper synchronization to protect the - * metadata load from concurrent table modifications and assigns a new catalog version. - * Throws a CatalogException if there is an error loading table metadata. - */ - public Table reloadTable(Table tbl) throws CatalogException { - LOG.debug(String.format("Refreshing table metadata: %s", tbl.getFullName())); - TTableName tblName = new TTableName(tbl.getDb().getName().toLowerCase(), - tbl.getName().toLowerCase()); - Db db = tbl.getDb(); - if (tbl instanceof IncompleteTable) { - TableLoadingMgr.LoadRequest loadReq; - long previousCatalogVersion; - // Return the table if it is already loaded or submit a new load request. - catalogLock_.readLock().lock(); - try { - previousCatalogVersion = tbl.getCatalogVersion(); - loadReq = tableLoadingMgr_.loadAsync(tblName); - } finally { - catalogLock_.readLock().unlock(); - } - Preconditions.checkNotNull(loadReq); - try { - // The table may have been dropped/modified while the load was in progress, so - // only apply the update if the existing table hasn't changed. - return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion); - } finally { - loadReq.close(); - } - } - - catalogLock_.writeLock().lock(); - synchronized(tbl) { - long newCatalogVersion = incrementAndGetCatalogVersion(); - catalogLock_.writeLock().unlock(); - try (MetaStoreClient msClient = getMetaStoreClient()) { - org.apache.hadoop.hive.metastore.api.Table msTbl = null; - try { - msTbl = msClient.getHiveClient().getTable(db.getName(), - tblName.getTable_name()); - } catch (Exception e) { - throw new TableLoadingException("Error loading metadata for table: " + - db.getName() + "." + tblName.getTable_name(), e); - } - tbl.load(true, msClient.getHiveClient(), msTbl); - } - tbl.setCatalogVersion(newCatalogVersion); - return tbl; - } - } - - /** - * Reloads the metadata of a table with name 'tableName'. Returns the table or null if - * the table does not exist. - */ - public Table reloadTable(TTableName tableName) throws CatalogException { - Table table = getTable(tableName.getDb_name(), tableName.getTable_name()); - if (table == null) return null; - return reloadTable(table); - } - - /** - * Drops the partition specified in 'partitionSpec' from 'tbl'. Throws a - * CatalogException if 'tbl' is not an HdfsTable. If the partition having the given - * partition spec does not exist, null is returned. Otherwise, the modified table is - * returned. - */ - public Table dropPartition(Table tbl, List<TPartitionKeyValue> partitionSpec) - throws CatalogException { - Preconditions.checkNotNull(tbl); - Preconditions.checkNotNull(partitionSpec); - Preconditions.checkState(Thread.holdsLock(tbl)); - if (!(tbl instanceof HdfsTable)) { - throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table"); - } - HdfsTable hdfsTable = (HdfsTable) tbl; - if (hdfsTable.dropPartition(partitionSpec) == null) return null; - return hdfsTable; - } - - /** - * Adds a partition to its HdfsTable and returns the modified table. - */ - public Table addPartition(HdfsPartition partition) throws CatalogException { - Preconditions.checkNotNull(partition); - HdfsTable hdfsTable = partition.getTable(); - Db db = getDb(hdfsTable.getDb().getName()); - hdfsTable.addPartition(partition); - return hdfsTable; - } - - /** - * Invalidates the table in the catalog cache, potentially adding/removing the table - * from the cache based on whether it exists in the Hive Metastore. - * The invalidation logic is: - * - If the table exists in the metastore, add it to the catalog as an uninitialized - * IncompleteTable (replacing any existing entry). The table metadata will be - * loaded lazily, on the next access. If the parent database for this table does not - * yet exist in Impala's cache it will also be added. - * - If the table does not exist in the metastore, remove it from the catalog cache. - * - If we are unable to determine whether the table exists in the metastore (there was - * an exception thrown making the RPC), invalidate any existing Table by replacing - * it with an uninitialized IncompleteTable. - * - * The parameter updatedObjects is a Pair that contains details on what catalog objects - * were modified as a result of the invalidateTable() call. The first item in the Pair - * is a Db which will only be set if a new database was added as a result of this call, - * otherwise it will be null. The second item in the Pair is the Table that was - * modified/added/removed. - * Returns a flag that indicates whether the items in updatedObjects were removed - * (returns true) or added/modified (return false). Only Tables should ever be removed. - */ - public boolean invalidateTable(TTableName tableName, Pair<Db, Table> updatedObjects) { - Preconditions.checkNotNull(updatedObjects); - updatedObjects.first = null; - updatedObjects.second = null; - LOG.debug(String.format("Invalidating table metadata: %s.%s", - tableName.getDb_name(), tableName.getTable_name())); - String dbName = tableName.getDb_name(); - String tblName = tableName.getTable_name(); - - // Stores whether the table exists in the metastore. Can have three states: - // 1) true - Table exists in metastore. - // 2) false - Table does not exist in metastore. - // 3) unknown (null) - There was exception thrown by the metastore client. - Boolean tableExistsInMetaStore; - Db db = null; - try (MetaStoreClient msClient = getMetaStoreClient()) { - org.apache.hadoop.hive.metastore.api.Database msDb = null; - try { - tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName); - } catch (UnknownDBException e) { - // The parent database does not exist in the metastore. Treat this the same - // as if the table does not exist. - tableExistsInMetaStore = false; - } catch (TException e) { - LOG.error("Error executing tableExists() metastore call: " + tblName, e); - tableExistsInMetaStore = null; - } - - if (tableExistsInMetaStore != null && !tableExistsInMetaStore) { - updatedObjects.second = removeTable(dbName, tblName); - return true; - } - - db = getDb(dbName); - if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) { - // The table does not exist in our cache AND it is unknown whether the - // table exists in the metastore. Do nothing. - return false; - } else if (db == null && tableExistsInMetaStore) { - // The table exists in the metastore, but our cache does not contain the parent - // database. A new db will be added to the cache along with the new table. msDb - // must be valid since tableExistsInMetaStore is true. - try { - msDb = msClient.getHiveClient().getDatabase(dbName); - Preconditions.checkNotNull(msDb); - db = new Db(dbName, this, msDb); - db.setCatalogVersion(incrementAndGetCatalogVersion()); - addDb(db); - updatedObjects.first = db; - } catch (TException e) { - // The metastore database cannot be get. Log the error and return. - LOG.error("Error executing getDatabase() metastore call: " + dbName, e); - return false; - } - } - } - - // Add a new uninitialized table to the table cache, effectively invalidating - // any existing entry. The metadata for the table will be loaded lazily, on the - // on the next access to the table. - Table newTable = IncompleteTable.createUninitializedTable( - getNextTableId(), db, tblName); - newTable.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(newTable); - if (loadInBackground_) { - tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(), - tblName.toLowerCase())); - } - updatedObjects.second = newTable; - return false; - } - - /** - * Adds a new role with the given name and grant groups to the AuthorizationPolicy. - * If a role with the same name already exists it will be overwritten. - */ - public Role addRole(String roleName, Set<String> grantGroups) { - catalogLock_.writeLock().lock(); - try { - Role role = new Role(roleName, grantGroups); - role.setCatalogVersion(incrementAndGetCatalogVersion()); - authPolicy_.addRole(role); - return role; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Removes the role with the given name from the AuthorizationPolicy. Returns the - * removed role with an incremented catalog version, or null if no role with this name - * exists. - */ - public Role removeRole(String roleName) { - catalogLock_.writeLock().lock(); - try { - Role role = authPolicy_.removeRole(roleName); - if (role == null) return null; - role.setCatalogVersion(incrementAndGetCatalogVersion()); - return role; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Adds a grant group to the given role name and returns the modified Role with - * an updated catalog version. If the role does not exist a CatalogException is thrown. - */ - public Role addRoleGrantGroup(String roleName, String groupName) - throws CatalogException { - catalogLock_.writeLock().lock(); - try { - Role role = authPolicy_.addGrantGroup(roleName, groupName); - Preconditions.checkNotNull(role); - role.setCatalogVersion(incrementAndGetCatalogVersion()); - return role; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Removes a grant group from the given role name and returns the modified Role with - * an updated catalog version. If the role does not exist a CatalogException is thrown. - */ - public Role removeRoleGrantGroup(String roleName, String groupName) - throws CatalogException { - catalogLock_.writeLock().lock(); - try { - Role role = authPolicy_.removeGrantGroup(roleName, groupName); - Preconditions.checkNotNull(role); - role.setCatalogVersion(incrementAndGetCatalogVersion()); - return role; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Adds a privilege to the given role name. Returns the new RolePrivilege and - * increments the catalog version. If the parent role does not exist a CatalogException - * is thrown. - */ - public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv) - throws CatalogException { - catalogLock_.writeLock().lock(); - try { - Role role = authPolicy_.getRole(roleName); - if (role == null) throw new CatalogException("Role does not exist: " + roleName); - RolePrivilege priv = RolePrivilege.fromThrift(thriftPriv); - priv.setCatalogVersion(incrementAndGetCatalogVersion()); - authPolicy_.addPrivilege(priv); - return priv; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Removes a RolePrivilege from the given role name. Returns the removed - * RolePrivilege with an incremented catalog version or null if no matching privilege - * was found. Throws a CatalogException if no role exists with this name. - */ - public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv) - throws CatalogException { - catalogLock_.writeLock().lock(); - try { - Role role = authPolicy_.getRole(roleName); - if (role == null) throw new CatalogException("Role does not exist: " + roleName); - RolePrivilege rolePrivilege = - role.removePrivilege(thriftPriv.getPrivilege_name()); - if (rolePrivilege == null) return null; - rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion()); - return rolePrivilege; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Gets a RolePrivilege from the given role name. Returns the privilege if it exists, - * or null if no privilege matching the privilege spec exist. - * Throws a CatalogException if the role does not exist. - */ - public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec) - throws CatalogException { - catalogLock_.readLock().lock(); - try { - Role role = authPolicy_.getRole(roleName); - if (role == null) throw new CatalogException("Role does not exist: " + roleName); - return role.getPrivilege(privSpec.getPrivilege_name()); - } finally { - catalogLock_.readLock().unlock(); - } - } - - /** - * Increments the current Catalog version and returns the new value. - */ - public long incrementAndGetCatalogVersion() { - catalogLock_.writeLock().lock(); - try { - return ++catalogVersion_; - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Returns the current Catalog version. - */ - public long getCatalogVersion() { - catalogLock_.readLock().lock(); - try { - return catalogVersion_; - } finally { - catalogLock_.readLock().unlock(); - } - } - - public ReentrantReadWriteLock getLock() { return catalogLock_; } - - /** - * Gets the next table ID and increments the table ID counter. - */ - public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); } - public SentryProxy getSentryProxy() { return sentryProxy_; } - public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } - - /** - * Reloads metadata for the partition defined by the partition spec - * 'partitionSpec' in table 'tbl'. Returns the table object with partition - * metadata reloaded - */ - public Table reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec) - throws CatalogException { - catalogLock_.writeLock().lock(); - synchronized (tbl) { - long newCatalogVersion = incrementAndGetCatalogVersion(); - catalogLock_.writeLock().unlock(); - HdfsTable hdfsTable = (HdfsTable) tbl; - HdfsPartition hdfsPartition = hdfsTable - .getPartitionFromThriftPartitionSpec(partitionSpec); - // Retrieve partition name from existing partition or construct it from - // the partition spec - String partitionName = hdfsPartition == null - ? HdfsTable.constructPartitionName(partitionSpec) - : hdfsPartition.getPartitionName(); - LOG.debug(String.format("Refreshing Partition metadata: %s %s", - hdfsTable.getFullName(), partitionName)); - try (MetaStoreClient msClient = getMetaStoreClient()) { - org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null; - try { - hmsPartition = msClient.getHiveClient().getPartition( - hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName); - } catch (NoSuchObjectException e) { - // If partition does not exist in Hive Metastore, remove it from the - // catalog - if (hdfsPartition != null) { - hdfsTable.dropPartition(partitionSpec); - hdfsTable.setCatalogVersion(newCatalogVersion); - } - return hdfsTable; - } catch (Exception e) { - throw new CatalogException("Error loading metadata for partition: " - + hdfsTable.getFullName() + " " + partitionName, e); - } - hdfsTable.reloadPartition(hdfsPartition, hmsPartition); - } - hdfsTable.setCatalogVersion(newCatalogVersion); - return hdfsTable; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/Column.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Column.java b/fe/src/main/java/com/cloudera/impala/catalog/Column.java deleted file mode 100644 index b2d7416..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/Column.java +++ /dev/null @@ -1,132 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.thrift.TColumn; -import com.cloudera.impala.thrift.TColumnStats; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Internal representation of column-related metadata. - * Owned by Catalog instance. - */ -public class Column { - private final static Logger LOG = LoggerFactory.getLogger(Column.class); - - protected final String name_; - protected final Type type_; - protected final String comment_; - protected int position_; // in table - - protected final ColumnStats stats_; - - public Column(String name, Type type, int position) { - this(name, type, null, position); - } - - public Column(String name, Type type, String comment, int position) { - name_ = name; - type_ = type; - comment_ = comment; - position_ = position; - stats_ = new ColumnStats(type); - } - - public String getComment() { return comment_; } - public String getName() { return name_; } - public Type getType() { return type_; } - public int getPosition() { return position_; } - public void setPosition(int position) { this.position_ = position; } - public ColumnStats getStats() { return stats_; } - - public boolean updateStats(ColumnStatisticsData statsData) { - boolean statsDataCompatibleWithColType = stats_.update(type_, statsData); - LOG.debug("col stats: " + name_ + " #distinct=" + stats_.getNumDistinctValues()); - return statsDataCompatibleWithColType; - } - - public void updateStats(TColumnStats statsData) { - stats_.update(type_, statsData); - } - - @Override - public String toString() { - return Objects.toStringHelper(this.getClass()) - .add("name_", name_) - .add("type_", type_) - .add("comment_", comment_) - .add("stats", stats_) - .add("position_", position_).toString(); - } - - public static Column fromThrift(TColumn columnDesc) { - String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null; - Preconditions.checkState(columnDesc.isSetPosition()); - int position = columnDesc.getPosition(); - Column col; - if (columnDesc.isIs_hbase_column()) { - // HBase table column. The HBase column qualifier (column name) is not be set for - // the HBase row key, so it being set in the thrift struct is not a precondition. - Preconditions.checkState(columnDesc.isSetColumn_family()); - Preconditions.checkState(columnDesc.isSetIs_binary()); - col = new HBaseColumn(columnDesc.getColumnName(), columnDesc.getColumn_family(), - columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(), - Type.fromThrift(columnDesc.getColumnType()), comment, position); - } else if (columnDesc.isIs_kudu_column()) { - Preconditions.checkState(columnDesc.isSetIs_key()); - Preconditions.checkState(columnDesc.isSetIs_nullable()); - col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(), - columnDesc.isIs_nullable(), - Type.fromThrift(columnDesc.getColumnType()), comment, position); - } else { - // Hdfs table column. - col = new Column(columnDesc.getColumnName(), - Type.fromThrift(columnDesc.getColumnType()), comment, position); - } - if (columnDesc.isSetCol_stats()) col.updateStats(columnDesc.getCol_stats()); - return col; - } - - public TColumn toThrift() { - TColumn colDesc = new TColumn(name_, type_.toThrift()); - if (comment_ != null) colDesc.setComment(comment_); - colDesc.setPosition(position_); - colDesc.setCol_stats(getStats().toThrift()); - return colDesc; - } - - public static List<FieldSchema> toFieldSchemas(List<Column> columns) { - return Lists.transform(columns, new Function<Column, FieldSchema>() { - public FieldSchema apply(Column column) { - Preconditions.checkNotNull(column.getType()); - return new FieldSchema(column.getName(), column.getType().toSql(), - column.getComment()); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java b/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java deleted file mode 100644 index 4ea47c1..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/ColumnNotFoundException.java +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - - -/** - * Thrown when a column cannot be found in the catalog. - */ -public class ColumnNotFoundException extends CatalogException { - // Dummy serial UID to avoid Eclipse warnings - private static final long serialVersionUID = -2203080667446640542L; - - public ColumnNotFoundException(String s) { super(s); } - - public ColumnNotFoundException(String s, Exception cause) { super(s, cause); } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java b/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java deleted file mode 100644 index 8f8e4b3..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java +++ /dev/null @@ -1,334 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.Set; - -import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; -import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; -import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; -import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.thrift.TColumnStats; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; - -/** - * Statistics for a single column. - */ -public class ColumnStats { - private final static Logger LOG = LoggerFactory.getLogger(ColumnStats.class); - - // Set of the currently supported column stats column types. - private final static Set<PrimitiveType> SUPPORTED_COL_TYPES = Sets.newHashSet( - PrimitiveType.BIGINT, PrimitiveType.BINARY, PrimitiveType.BOOLEAN, - PrimitiveType.DOUBLE, PrimitiveType.FLOAT, PrimitiveType.INT, - PrimitiveType.SMALLINT, PrimitiveType.CHAR, PrimitiveType.VARCHAR, - PrimitiveType.STRING, PrimitiveType.TIMESTAMP, PrimitiveType.TINYINT, - PrimitiveType.DECIMAL); - - public enum StatsKey { - NUM_DISTINCT_VALUES("numDVs"), - NUM_NULLS("numNulls"), - AVG_SIZE("avgSize"), - MAX_SIZE("maxSize"); - - private final String name_; - - private StatsKey(String name) { name_ = name; } - - /** - * Returns the StatsKey whose name matches 'key'. The comparison is - * case insensitive. Returns null if there is no matching StatsKey. - */ - public static StatsKey fromString(String key) { - for (StatsKey k: values()) { - if (key.equalsIgnoreCase(k.name_)) return k; - } - return null; - } - - @Override - public String toString() { return name_; } - } - - // in bytes: excludes serialization overhead - private double avgSize_; - // in bytes; includes serialization overhead. - private double avgSerializedSize_; - private long maxSize_; // in bytes - private long numDistinctValues_; - private long numNulls_; - - public ColumnStats(Type colType) { - initColStats(colType); - } - - /** - * C'tor for clone(). - */ - private ColumnStats(ColumnStats other) { - avgSize_ = other.avgSize_; - avgSerializedSize_ = other.avgSerializedSize_; - maxSize_ = other.maxSize_; - numDistinctValues_ = other.numDistinctValues_; - numNulls_ = other.numNulls_; - } - - /** - * Initializes all column stats values as "unknown". For fixed-length type - * (those which don't need additional storage besides the slot they occupy), - * sets avgSerializedSize and maxSize to their slot size. - */ - private void initColStats(Type colType) { - avgSize_ = -1; - avgSerializedSize_ = -1; - maxSize_ = -1; - numDistinctValues_ = -1; - numNulls_ = -1; - if (colType.isFixedLengthType()) { - avgSerializedSize_ = colType.getSlotSize(); - avgSize_ = colType.getSlotSize(); - maxSize_ = colType.getSlotSize(); - } - } - - /** - * Creates ColumnStats from the given expr. Sets numDistinctValues and if the expr - * is a SlotRef also numNulls. - */ - public static ColumnStats fromExpr(Expr expr) { - Preconditions.checkNotNull(expr); - Preconditions.checkState(expr.getType().isValid()); - ColumnStats stats = new ColumnStats(expr.getType()); - stats.setNumDistinctValues(expr.getNumDistinctValues()); - SlotRef slotRef = expr.unwrapSlotRef(false); - if (slotRef == null) return stats; - ColumnStats slotStats = slotRef.getDesc().getStats(); - if (slotStats == null) return stats; - stats.numNulls_ = slotStats.getNumNulls(); - stats.avgSerializedSize_ = slotStats.getAvgSerializedSize(); - stats.avgSize_ = slotStats.getAvgSize(); - stats.maxSize_ = slotStats.getMaxSize(); - return stats; - } - - /** - * Adds other's numDistinctValues and numNulls to this ColumnStats. - * If this or other's stats are invalid, sets the corresponding stat to invalid, - * Returns this with the updated stats. - * This method is used to aggregate stats for slots that originate from multiple - * source slots, e.g., those produced by union queries. - */ - public ColumnStats add(ColumnStats other) { - if (numDistinctValues_ == -1 || other.numDistinctValues_ == -1) { - numDistinctValues_ = -1; - } else { - numDistinctValues_ += other.numDistinctValues_; - } - if (numNulls_ == -1 || other.numNulls_ == -1) { - numNulls_ = -1; - } else { - numNulls_ += other.numNulls_; - } - return this; - } - - public void setAvgSize(float avgSize) { avgSize_ = avgSize; } - public void setAvgSerializedSize(float avgSize) { avgSerializedSize_ = avgSize; } - public void setMaxSize(long maxSize) { maxSize_ = maxSize; } - public long getNumDistinctValues() { return numDistinctValues_; } - public void setNumDistinctValues(long numDistinctValues) { - this.numDistinctValues_ = numDistinctValues; - } - public void setNumNulls(long numNulls) { numNulls_ = numNulls; } - public double getAvgSerializedSize() { return avgSerializedSize_; } - public double getAvgSize() { return avgSize_; } - public long getMaxSize() { return maxSize_; } - public boolean hasNulls() { return numNulls_ > 0; } - public long getNumNulls() { return numNulls_; } - public boolean hasAvgSerializedSize() { return avgSerializedSize_ >= 0; } - public boolean hasMaxSize() { return maxSize_ >= 0; } - public boolean hasNumDistinctValues() { return numDistinctValues_ >= 0; } - public boolean hasStats() { return numNulls_ != -1 || numDistinctValues_ != -1; } - - /** - * Updates the stats with the given ColumnStatisticsData. If the ColumnStatisticsData - * is not compatible with the given colType, all stats are initialized based on - * initColStats(). - * Returns false if the ColumnStatisticsData data was incompatible with the given - * column type, otherwise returns true. - */ - public boolean update(Type colType, ColumnStatisticsData statsData) { - Preconditions.checkState(isSupportedColType(colType)); - initColStats(colType); - boolean isCompatible = false; - switch (colType.getPrimitiveType()) { - case BOOLEAN: - isCompatible = statsData.isSetBooleanStats(); - if (isCompatible) { - BooleanColumnStatsData boolStats = statsData.getBooleanStats(); - numNulls_ = boolStats.getNumNulls(); - numDistinctValues_ = (numNulls_ > 0) ? 3 : 2; - } - break; - case TINYINT: - case SMALLINT: - case INT: - case BIGINT: - case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps. - isCompatible = statsData.isSetLongStats(); - if (isCompatible) { - LongColumnStatsData longStats = statsData.getLongStats(); - numDistinctValues_ = longStats.getNumDVs(); - numNulls_ = longStats.getNumNulls(); - } - break; - case FLOAT: - case DOUBLE: - isCompatible = statsData.isSetDoubleStats(); - if (isCompatible) { - DoubleColumnStatsData doubleStats = statsData.getDoubleStats(); - numDistinctValues_ = doubleStats.getNumDVs(); - numNulls_ = doubleStats.getNumNulls(); - } - break; - case CHAR: - case VARCHAR: - case STRING: - isCompatible = statsData.isSetStringStats(); - if (isCompatible) { - StringColumnStatsData stringStats = statsData.getStringStats(); - numDistinctValues_ = stringStats.getNumDVs(); - numNulls_ = stringStats.getNumNulls(); - maxSize_ = stringStats.getMaxColLen(); - avgSize_ = Double.valueOf(stringStats.getAvgColLen()).floatValue(); - avgSerializedSize_ = avgSize_ + PrimitiveType.STRING.getSlotSize(); - } - break; - case BINARY: - isCompatible = statsData.isSetStringStats(); - if (isCompatible) { - BinaryColumnStatsData binaryStats = statsData.getBinaryStats(); - numNulls_ = binaryStats.getNumNulls(); - maxSize_ = binaryStats.getMaxColLen(); - avgSize_ = Double.valueOf(binaryStats.getAvgColLen()).floatValue(); - avgSerializedSize_ = avgSize_ + PrimitiveType.BINARY.getSlotSize(); - } - break; - case DECIMAL: - isCompatible = statsData.isSetDecimalStats(); - if (isCompatible) { - DecimalColumnStatsData decimalStats = statsData.getDecimalStats(); - numNulls_ = decimalStats.getNumNulls(); - numDistinctValues_ = decimalStats.getNumDVs(); - } - break; - default: - Preconditions.checkState(false, - "Unexpected column type: " + colType.toString()); - break; - } - return isCompatible; - } - - /** - * Sets the member corresponding to the given stats key to 'value'. - * Requires that the given value is of a type appropriate for the - * member being set. Throws if that is not the case. - */ - public void update(StatsKey key, Number value) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(value); - if (key == StatsKey.AVG_SIZE) { - Preconditions.checkArgument(value instanceof Float); - } else { - Preconditions.checkArgument(value instanceof Long); - } - switch (key) { - case NUM_DISTINCT_VALUES: { - numDistinctValues_ = (Long) value; - break; - } - case NUM_NULLS: { - numNulls_ = (Long) value; - break; - } - case AVG_SIZE: { - avgSize_ = (Float) value; - break; - } - case MAX_SIZE: { - maxSize_ = (Long) value; - break; - } - default: Preconditions.checkState(false); - } - } - - /** - * Returns true if the given PrimitiveType supports column stats updates. - */ - public static boolean isSupportedColType(Type colType) { - if (!colType.isScalarType()) return false; - ScalarType scalarType = (ScalarType) colType; - return SUPPORTED_COL_TYPES.contains(scalarType.getPrimitiveType()); - } - - public void update(Type colType, TColumnStats stats) { - initColStats(colType); - avgSize_ = Double.valueOf(stats.getAvg_size()).floatValue(); - if (colType.getPrimitiveType() == PrimitiveType.STRING || - colType.getPrimitiveType() == PrimitiveType.BINARY) { - avgSerializedSize_ = colType.getSlotSize() + avgSize_; - } - maxSize_ = stats.getMax_size(); - numDistinctValues_ = stats.getNum_distinct_values(); - numNulls_ = stats.getNum_nulls(); - } - - public TColumnStats toThrift() { - TColumnStats colStats = new TColumnStats(); - colStats.setAvg_size(avgSize_); - colStats.setMax_size(maxSize_); - colStats.setNum_distinct_values(numDistinctValues_); - colStats.setNum_nulls(numNulls_); - return colStats; - } - - @Override - public String toString() { - return Objects.toStringHelper(this.getClass()) - .add("avgSerializedSize_", avgSerializedSize_) - .add("maxSize_", maxSize_) - .add("numDistinct_", numDistinctValues_) - .add("numNulls_", numNulls_) - .toString(); - } - - @Override - public ColumnStats clone() { return new ColumnStats(this); } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java deleted file mode 100644 index ed0d9ee..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/DataSource.java +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import org.apache.hadoop.fs.Path; - -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TDataSource; -import com.google.common.base.Objects; - -/** - * Represents a data source in the catalog. Contains the data source name and all - * information needed to locate and load the data source. - */ -public class DataSource implements CatalogObject { - private final String dataSrcName_; - private final String className_; - private final String apiVersionString_; - // Qualified path to the data source. - private final String location_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; - - public DataSource(String dataSrcName, String location, String className, - String apiVersionString) { - dataSrcName_ = dataSrcName; - location_ = location; - className_ = className; - apiVersionString_ = apiVersionString; - } - - public static DataSource fromThrift(TDataSource thrift) { - return new DataSource(thrift.getName(), thrift.getHdfs_location(), - thrift.getClass_name(), thrift.getApi_version()); - } - - @Override - public TCatalogObjectType getCatalogObjectType() { - return TCatalogObjectType.DATA_SOURCE; - } - - @Override - public long getCatalogVersion() { return catalogVersion_; } - - @Override - public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } - - @Override - public String getName() { return dataSrcName_; } - - @Override - public boolean isLoaded() { return true; } - - public String getLocation() { return location_; } - public String getClassName() { return className_; } - public String getApiVersion() { return apiVersionString_; } - - public TDataSource toThrift() { - return new TDataSource(getName(), location_, className_, apiVersionString_); - } - - public String debugString() { - return Objects.toStringHelper(this) - .add("name", dataSrcName_) - .add("location", location_) - .add("className", className_) - .add("apiVersion", apiVersionString_) - .toString(); - } - - public static String debugString(TDataSource thrift) { - return fromThrift(thrift).debugString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java b/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java deleted file mode 100644 index c42c804..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/DataSourceTable.java +++ /dev/null @@ -1,259 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.extdatasource.v1.ExternalDataSource; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TColumn; -import com.cloudera.impala.thrift.TDataSource; -import com.cloudera.impala.thrift.TDataSourceTable; -import com.cloudera.impala.thrift.TResultSet; -import com.cloudera.impala.thrift.TResultSetMetadata; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TTableDescriptor; -import com.cloudera.impala.thrift.TTableType; -import com.cloudera.impala.util.TResultRowBuilder; -import com.google.common.base.Preconditions; - -/** - * Represents a table backed by an external data source. All data source properties are - * stored as table properties (persisted in the metastore) because the DataSource catalog - * object is not persisted so the DataSource catalog object will not exist if the catalog - * server is restarted, but the table does not need the DataSource catalog object in - * order to scan the table. Tables that contain the TBL_PROP_DATA_SRC_NAME table - * parameter are assumed to be backed by an external data source. - */ -public class DataSourceTable extends Table { - private final static Logger LOG = LoggerFactory.getLogger(DataSourceTable.class); - - /** - * Table property key for the data source name. - */ - public static final String TBL_PROP_DATA_SRC_NAME = "__IMPALA_DATA_SOURCE_NAME"; - - /** - * Table property key for the table init string. - */ - public static final String TBL_PROP_INIT_STRING = "__IMPALA_DATA_SOURCE_INIT_STRING"; - - /** - * Table property key for the data source library HDFS path. - */ - public static final String TBL_PROP_LOCATION = "__IMPALA_DATA_SOURCE_LOCATION"; - - /** - * Table property key for the class implementing {@link ExternalDataSource}. - */ - public static final String TBL_PROP_CLASS = "__IMPALA_DATA_SOURCE_CLASS"; - - /** - * Table property key for the API version implemented by the data source. - */ - public static final String TBL_PROP_API_VER = "__IMPALA_DATA_SOURCE_API_VERSION"; - - private String initString_; - private TDataSource dataSource_; - - protected DataSourceTable( - TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, - Db db, String name, String owner) { - super(id, msTable, db, name, owner); - } - - /** - * Gets the the data source. - */ - public TDataSource getDataSource() { return dataSource_; } - - /** - * Gets the table init string passed to the data source. - */ - public String getInitString() { return initString_; } - - public int getNumNodes() { return 1; } - - @Override - public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } - - /** - * Returns true if the column type is supported. - */ - public static boolean isSupportedColumnType(Type colType) { - Preconditions.checkNotNull(colType); - return isSupportedPrimitiveType(colType.getPrimitiveType()); - } - - /** - * Returns true if the primitive type is supported. - */ - public static boolean isSupportedPrimitiveType(PrimitiveType primitiveType) { - Preconditions.checkNotNull(primitiveType); - switch (primitiveType) { - case BIGINT: - case INT: - case SMALLINT: - case TINYINT: - case DOUBLE: - case FLOAT: - case BOOLEAN: - case STRING: - case TIMESTAMP: - case DECIMAL: - return true; - case BINARY: - case CHAR: - case DATE: - case DATETIME: - case INVALID_TYPE: - case NULL_TYPE: - default: - return false; - } - } - - /** - * Create columns corresponding to fieldSchemas. - * Throws a TableLoadingException if the metadata is incompatible with what we - * support. - */ - private void loadColumns(List<FieldSchema> fieldSchemas, IMetaStoreClient client) - throws TableLoadingException { - int pos = 0; - for (FieldSchema s: fieldSchemas) { - Column col = new Column(s.getName(), parseColumnType(s), s.getComment(), pos); - Preconditions.checkArgument(isSupportedColumnType(col.getType())); - addColumn(col); - ++pos; - } - } - - @Override - protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { - super.loadFromThrift(thriftTable); - TDataSourceTable dataSourceTable = thriftTable.getData_source_table(); - initString_ = dataSourceTable.getInit_string(); - dataSource_ = dataSourceTable.getData_source(); - } - - @Override - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { - Preconditions.checkNotNull(msTbl); - msTable_ = msTbl; - clearColumns(); - LOG.debug("load table: " + db_.getName() + "." + name_); - String dataSourceName = getRequiredTableProperty(msTbl, TBL_PROP_DATA_SRC_NAME, null); - String location = getRequiredTableProperty(msTbl, TBL_PROP_LOCATION, dataSourceName); - String className = getRequiredTableProperty(msTbl, TBL_PROP_CLASS, dataSourceName); - String apiVersionString = getRequiredTableProperty(msTbl, TBL_PROP_API_VER, - dataSourceName); - dataSource_ = new TDataSource(dataSourceName, location, className, apiVersionString); - initString_ = getRequiredTableProperty(msTbl, TBL_PROP_INIT_STRING, dataSourceName); - - if (msTbl.getPartitionKeysSize() > 0) { - throw new TableLoadingException("Data source table cannot contain clustering " + - "columns: " + name_); - } - numClusteringCols_ = 0; - - try { - // Create column objects. - List<FieldSchema> fieldSchemas = getMetaStoreTable().getSd().getCols(); - loadColumns(fieldSchemas, client); - - // Set table stats. - numRows_ = getRowCount(super.getMetaStoreTable().getParameters()); - } catch (Exception e) { - throw new TableLoadingException("Failed to load metadata for data source table: " + - name_, e); - } - } - - private String getRequiredTableProperty( - org.apache.hadoop.hive.metastore.api.Table msTbl, String key, String dataSourceName) - throws TableLoadingException { - String val = msTbl.getParameters().get(key); - if (val == null) { - throw new TableLoadingException(String.format("Failed to load table %s produced " + - "by external data source %s. Missing required metadata: %s", name_, - dataSourceName == null ? "<unknown>" : dataSourceName, key)); - } - return val; - } - - /** - * Returns statistics on this table as a tabular result set. Used for the - * SHOW TABLE STATS statement. The schema of the returned TResultSet is set - * inside this method. - */ - public TResultSet getTableStats() { - TResultSet result = new TResultSet(); - TResultSetMetadata resultSchema = new TResultSetMetadata(); - resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); - result.setSchema(resultSchema); - TResultRowBuilder rowBuilder = new TResultRowBuilder(); - rowBuilder.add(numRows_); - result.addToRows(rowBuilder.get()); - return result; - } - - @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), - TTableType.DATA_SOURCE_TABLE, getTColumnDescriptors(), numClusteringCols_, - name_, db_.getName()); - tableDesc.setDataSourceTable(getDataSourceTable()); - return tableDesc; - } - - /** - * Returns a thrift structure representing the table. - */ - @Override - public TTable toThrift() { - TTable table = super.toThrift(); - table.setTable_type(TTableType.DATA_SOURCE_TABLE); - table.setData_source_table(getDataSourceTable()); - return table; - } - - /** - * Returns a thrift {@link TDataSourceTable} structure for the data source table. - */ - private TDataSourceTable getDataSourceTable() { - return new TDataSourceTable(dataSource_, initString_); - } - - /** - * True if the Hive {@link org.apache.hadoop.hive.metastore.api.Table} is a - * data source table by checking for the existance of the - * TBL_PROP_DATA_SRC_NAME table property. - */ - public static boolean isDataSourceTable( - org.apache.hadoop.hive.metastore.api.Table msTbl) { - return msTbl.getParameters().containsKey(TBL_PROP_DATA_SRC_NAME); - } -}
