http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HiveStorageDescriptorFactory.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HiveStorageDescriptorFactory.java b/fe/src/main/java/com/cloudera/impala/catalog/HiveStorageDescriptorFactory.java deleted file mode 100644 index 9644bbb..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/HiveStorageDescriptorFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.HashMap; - -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; - -import com.cloudera.impala.thrift.THdfsFileFormat; -import com.google.common.base.Preconditions; - -public class HiveStorageDescriptorFactory { - /** - * Creates and returns a Hive StoreDescriptor for the given FileFormat and RowFormat. - * Currently supports creating StorageDescriptors for Parquet, Text, Sequence, Avro and - * RC file. - * TODO: Add support for HBase - */ - public static StorageDescriptor createSd(THdfsFileFormat fileFormat, - RowFormat rowFormat) { - Preconditions.checkNotNull(fileFormat); - Preconditions.checkNotNull(rowFormat); - - StorageDescriptor sd = new StorageDescriptor(); - sd.setSerdeInfo(new org.apache.hadoop.hive.metastore.api.SerDeInfo()); - sd.getSerdeInfo().setParameters(new HashMap<String, String>()); - // The compressed flag is not used to determine whether the table is compressed or - // not. Instead, we use the input format or the filename. - sd.setCompressed(false); - HdfsFileFormat hdfsFileFormat = HdfsFileFormat.fromThrift(fileFormat); - sd.setInputFormat(hdfsFileFormat.inputFormat()); - sd.setOutputFormat(hdfsFileFormat.outputFormat()); - sd.getSerdeInfo().setSerializationLib(hdfsFileFormat.serializationLib()); - - if (rowFormat.getFieldDelimiter() != null) { - sd.getSerdeInfo().putToParameters( - "serialization.format", rowFormat.getFieldDelimiter()); - sd.getSerdeInfo().putToParameters("field.delim", rowFormat.getFieldDelimiter()); - } - if (rowFormat.getEscapeChar() != null) { - sd.getSerdeInfo().putToParameters("escape.delim", rowFormat.getEscapeChar()); - } - if (rowFormat.getLineDelimiter() != null) { - sd.getSerdeInfo().putToParameters("line.delim", rowFormat.getLineDelimiter()); - } - return sd; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java deleted file mode 100644 index 02c9747..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java +++ /dev/null @@ -1,448 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.log4j.Logger; -import org.apache.thrift.TException; - -import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TCatalogObject; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TDataSource; -import com.cloudera.impala.thrift.TDatabase; -import com.cloudera.impala.thrift.TFunction; -import com.cloudera.impala.thrift.TPrivilege; -import com.cloudera.impala.thrift.TRole; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TUniqueId; -import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest; -import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse; - -/** - * Thread safe Catalog for an Impalad. The Impalad catalog can be updated either via - * a StateStore heartbeat or by directly applying the result of a catalog operation to - * the CatalogCache. All updates are applied using the updateCatalog() function. - * Table metadata is loaded lazily. The CatalogServer initially broadcasts (via the - * statestore) the known table names (as IncompleteTables). These table names are added - * to the Impalad catalog cache and when one of the tables is accessed, the impalad will - * make an RPC to the CatalogServer to request loading the complete table metadata. - * In both cases, we need to ensure that work from one update is not "undone" by another - * update. To handle this the ImpaladCatalog does the following: - * - Tracks the overall catalog version last received in a state store heartbeat, this - * version is maintained by the catalog server and it is always guaranteed that - * this impalad's catalog will never contain any objects < than this version - * (any updates with a lower version number are ignored). - * - For updated/new objects, check if the object already exists in the - * catalog cache. If it does, only apply the update if the catalog version is > the - * existing object's catalog version. Also keep a log of all dropped catalog objects - * (and the version they were dropped in). Before updating any object, check if it was - * dropped in a later version. If so, ignore the update. - * - Before dropping any catalog object, see if the object already exists in the catalog - * cache. If it does, only drop the object if the version of the drop is > that - * object's catalog version. - * The CatalogServiceId is also tracked to detect if a different instance of the catalog - * service has been started, in which case a full topic update is required. - */ -public class ImpaladCatalog extends Catalog { - private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class); - private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L); - - // The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer - // has restarted. - private TUniqueId catalogServiceId_ = INITIAL_CATALOG_SERVICE_ID; - - // The catalog version received in the last StateStore heartbeat. It is guaranteed - // all objects in the catalog have at a minimum, this version. Because updates may - // be applied out of band of a StateStore heartbeat, it is possible the catalog - // contains some objects > than this version. - private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; - - // Flag to determine if the Catalog is ready to accept user requests. See isReady(). - private final AtomicBoolean isReady_ = new AtomicBoolean(false); - - // Tracks modifications to this Impalad's catalog from direct updates to the cache. - private final CatalogDeltaLog catalogDeltaLog_ = new CatalogDeltaLog(); - - // Object that is used to synchronize on and signal when a catalog update is received. - private final Object catalogUpdateEventNotifier_ = new Object(); - - /** - * C'tor used by tests that need to validate the ImpaladCatalog outside of the - * CatalogServer. - */ - public ImpaladCatalog() { - super(false); - } - - /** - * Updates the internal Catalog based on the given TCatalogUpdateReq. - * This method: - * 1) Updates all databases in the Catalog - * 2) Updates all tables, views, and functions in the Catalog - * 3) Removes all dropped tables, views, and functions - * 4) Removes all dropped databases - * - * This method is called once per statestore heartbeat and is guaranteed the same - * object will not be in both the "updated" list and the "removed" list (it is - * a detail handled by the statestore). - * Catalog updates are ordered by the object type with the dependent objects coming - * first. That is, database "foo" will always come before table "foo.bar". - * Synchronized because updateCatalog() can be called by during a statestore update or - * during a direct-DDL operation and catalogServiceId_ and lastSyncedCatalogVersion_ - * must be protected. - */ - public synchronized TUpdateCatalogCacheResponse updateCatalog( - TUpdateCatalogCacheRequest req) throws CatalogException { - // Check for changes in the catalog service ID. - if (!catalogServiceId_.equals(req.getCatalog_service_id())) { - boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID); - catalogServiceId_ = req.getCatalog_service_id(); - if (!firstRun) { - // Throw an exception which will trigger a full topic update request. - throw new CatalogException("Detected catalog service ID change. Aborting " + - "updateCatalog()"); - } - } - - // First process all updates - long newCatalogVersion = lastSyncedCatalogVersion_; - for (TCatalogObject catalogObject: req.getUpdated_objects()) { - if (catalogObject.getType() == TCatalogObjectType.CATALOG) { - newCatalogVersion = catalogObject.getCatalog_version(); - } else { - try { - addCatalogObject(catalogObject); - } catch (Exception e) { - LOG.error("Error adding catalog object: " + e.getMessage(), e); - } - } - } - - // Now remove all objects from the catalog. Removing a database before removing - // its child tables/functions is fine. If that happens, the removal of the child - // object will be a no-op. - for (TCatalogObject catalogObject: req.getRemoved_objects()) { - removeCatalogObject(catalogObject, newCatalogVersion); - } - lastSyncedCatalogVersion_ = newCatalogVersion; - // Cleanup old entries in the log. - catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_); - isReady_.set(true); - - // Notify all the threads waiting on a catalog update. - synchronized (catalogUpdateEventNotifier_) { - catalogUpdateEventNotifier_.notifyAll(); - } - - return new TUpdateCatalogCacheResponse(catalogServiceId_); - } - - /** - * Causes the calling thread to wait until a catalog update notification has been sent - * or the given timeout has been reached. A timeout value of 0 indicates an indefinite - * wait. Does not protect against spurious wakeups, so this should be called in a loop. - * - */ - public void waitForCatalogUpdate(long timeoutMs) { - synchronized (catalogUpdateEventNotifier_) { - try { - catalogUpdateEventNotifier_.wait(timeoutMs); - } catch (InterruptedException e) { - // Ignore - } - } - } - - /** - * Returns the Table object for the given dbName/tableName. Returns null - * if the table does not exist. Will throw a TableLoadingException if the table's - * metadata was not able to be loaded successfully and DatabaseNotFoundException - * if the parent database does not exist. - */ - @Override - public Table getTable(String dbName, String tableName) - throws CatalogException { - Table table = super.getTable(dbName, tableName); - if (table == null) return null; - - if (table.isLoaded() && table instanceof IncompleteTable) { - // If there were problems loading this table's metadata, throw an exception - // when it is accessed. - ImpalaException cause = ((IncompleteTable) table).getCause(); - if (cause instanceof TableLoadingException) throw (TableLoadingException) cause; - throw new TableLoadingException("Missing metadata for table: " + tableName, cause); - } - return table; - } - - /** - * Returns the HDFS path where the metastore would create the given table. If the table - * has a "location" set, that will be returned. Otherwise the path will be resolved - * based on the location of the parent database. The metastore folder hierarchy is: - * <warehouse directory>/<db name>.db/<table name> - * Except for items in the default database which will be: - * <warehouse directory>/<table name> - * This method handles both of these cases. - */ - public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl) - throws NoSuchObjectException, MetaException, TException { - try (MetaStoreClient msClient = getMetaStoreClient()) { - // If the table did not have its path set, build the path based on the the - // location property of the parent database. - if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) { - String dbLocation = - msClient.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri(); - return new Path(dbLocation, msTbl.getTableName().toLowerCase()); - } else { - return new Path(msTbl.getSd().getLocation()); - } - } - } - - /** - * Adds the given TCatalogObject to the catalog cache. The update may be ignored - * (considered out of date) if: - * 1) An item exists in the catalog cache with a version > than the given - * TCatalogObject's version. - * 2) The catalogDeltaLog_ contains an entry for this object with a version - * > than the given TCatalogObject's version. - */ - private void addCatalogObject(TCatalogObject catalogObject) - throws TableLoadingException, DatabaseNotFoundException { - // This item is out of date and should not be applied to the catalog. - if (catalogDeltaLog_.wasObjectRemovedAfter(catalogObject)) { - LOG.debug(String.format("Skipping update because a matching object was removed " + - "in a later catalog version: %s", catalogObject)); - return; - } - - switch(catalogObject.getType()) { - case DATABASE: - addDb(catalogObject.getDb(), catalogObject.getCatalog_version()); - break; - case TABLE: - case VIEW: - addTable(catalogObject.getTable(), catalogObject.getCatalog_version()); - break; - case FUNCTION: - addFunction(catalogObject.getFn(), catalogObject.getCatalog_version()); - break; - case DATA_SOURCE: - addDataSource(catalogObject.getData_source(), catalogObject.getCatalog_version()); - break; - case ROLE: - Role role = Role.fromThrift(catalogObject.getRole()); - role.setCatalogVersion(catalogObject.getCatalog_version()); - authPolicy_.addRole(role); - break; - case PRIVILEGE: - RolePrivilege privilege = - RolePrivilege.fromThrift(catalogObject.getPrivilege()); - privilege.setCatalogVersion(catalogObject.getCatalog_version()); - try { - authPolicy_.addPrivilege(privilege); - } catch (CatalogException e) { - LOG.error("Error adding privilege: ", e); - } - break; - case HDFS_CACHE_POOL: - HdfsCachePool cachePool = new HdfsCachePool(catalogObject.getCache_pool()); - cachePool.setCatalogVersion(catalogObject.getCatalog_version()); - hdfsCachePools_.add(cachePool); - break; - default: - throw new IllegalStateException( - "Unexpected TCatalogObjectType: " + catalogObject.getType()); - } - } - - /** - * Removes the matching TCatalogObject from the catalog, if one exists and its - * catalog version is < the catalog version of this drop operation. - * Note that drop operations that come from statestore heartbeats always have a - * version of 0. To determine the drop version for statestore updates, - * the catalog version from the current update is used. This is okay because there - * can never be a catalog update from the statestore that contains a drop - * and an addition of the same object. For more details on how drop - * versioning works, see CatalogServerCatalog.java - */ - private void removeCatalogObject(TCatalogObject catalogObject, - long currentCatalogUpdateVersion) { - // The TCatalogObject associated with a drop operation from a state store - // heartbeat will always have a version of zero. Because no update from - // the state store can contain both a drop and an addition of the same object, - // we can assume the drop version is the current catalog version of this update. - // If the TCatalogObject contains a version that != 0, it indicates the drop - // came from a direct update. - long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ? - currentCatalogUpdateVersion : catalogObject.getCatalog_version(); - - switch(catalogObject.getType()) { - case DATABASE: - removeDb(catalogObject.getDb(), dropCatalogVersion); - break; - case TABLE: - case VIEW: - removeTable(catalogObject.getTable(), dropCatalogVersion); - break; - case FUNCTION: - removeFunction(catalogObject.getFn(), dropCatalogVersion); - break; - case DATA_SOURCE: - removeDataSource(catalogObject.getData_source(), dropCatalogVersion); - break; - case ROLE: - removeRole(catalogObject.getRole(), dropCatalogVersion); - break; - case PRIVILEGE: - removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion); - break; - case HDFS_CACHE_POOL: - HdfsCachePool existingItem = - hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name()); - if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) { - hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name()); - } - break; - default: - throw new IllegalStateException( - "Unexpected TCatalogObjectType: " + catalogObject.getType()); - } - - if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_) { - catalogDeltaLog_.addRemovedObject(catalogObject); - } - } - - private void addDb(TDatabase thriftDb, long catalogVersion) { - Db existingDb = getDb(thriftDb.getDb_name()); - if (existingDb == null || - existingDb.getCatalogVersion() < catalogVersion) { - Db newDb = Db.fromTDatabase(thriftDb, this); - newDb.setCatalogVersion(catalogVersion); - addDb(newDb); - } - } - - private void addTable(TTable thriftTable, long catalogVersion) - throws TableLoadingException { - Db db = getDb(thriftTable.db_name); - if (db == null) { - LOG.debug("Parent database of table does not exist: " + - thriftTable.db_name + "." + thriftTable.tbl_name); - return; - } - - Table newTable = Table.fromThrift(db, thriftTable); - newTable.setCatalogVersion(catalogVersion); - db.addTable(newTable); - } - - private void addFunction(TFunction fn, long catalogVersion) { - Function function = Function.fromThrift(fn); - function.setCatalogVersion(catalogVersion); - Db db = getDb(function.getFunctionName().getDb()); - if (db == null) { - LOG.debug("Parent database of function does not exist: " + function.getName()); - return; - } - Function existingFn = db.getFunction(fn.getSignature()); - if (existingFn == null || - existingFn.getCatalogVersion() < catalogVersion) { - db.addFunction(function); - } - } - - private void addDataSource(TDataSource thrift, long catalogVersion) { - DataSource dataSource = DataSource.fromThrift(thrift); - dataSource.setCatalogVersion(catalogVersion); - addDataSource(dataSource); - } - - private void removeDataSource(TDataSource thrift, long dropCatalogVersion) { - removeDataSource(thrift.getName()); - } - - private void removeDb(TDatabase thriftDb, long dropCatalogVersion) { - Db db = getDb(thriftDb.getDb_name()); - if (db != null && db.getCatalogVersion() < dropCatalogVersion) { - removeDb(db.getName()); - } - } - - private void removeTable(TTable thriftTable, long dropCatalogVersion) { - Db db = getDb(thriftTable.db_name); - // The parent database doesn't exist, nothing to do. - if (db == null) return; - - Table table = db.getTable(thriftTable.getTbl_name()); - if (table != null && table.getCatalogVersion() < dropCatalogVersion) { - db.removeTable(thriftTable.tbl_name); - } - } - - private void removeFunction(TFunction thriftFn, long dropCatalogVersion) { - Db db = getDb(thriftFn.name.getDb_name()); - // The parent database doesn't exist, nothing to do. - if (db == null) return; - - // If the function exists and it has a catalog version less than the - // version of the drop, remove the function. - Function fn = db.getFunction(thriftFn.getSignature()); - if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) { - db.removeFunction(thriftFn.getSignature()); - } - } - - private void removeRole(TRole thriftRole, long dropCatalogVersion) { - Role existingRole = authPolicy_.getRole(thriftRole.getRole_name()); - // version of the drop, remove the function. - if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) { - authPolicy_.removeRole(thriftRole.getRole_name()); - } - } - - private void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion) { - Role role = authPolicy_.getRole(thriftPrivilege.getRole_id()); - if (role == null) return; - RolePrivilege existingPrivilege = - role.getPrivilege(thriftPrivilege.getPrivilege_name()); - // version of the drop, remove the function. - if (existingPrivilege != null && - existingPrivilege.getCatalogVersion() < dropCatalogVersion) { - role.removePrivilege(thriftPrivilege.getPrivilege_name()); - } - } - - /** - * Returns true if the ImpaladCatalog is ready to accept requests (has - * received and processed a valid catalog topic update from the StateStore), - * false otherwise. - */ - public boolean isReady() { return isReady_.get(); } - - // Only used for testing. - public void setIsReady(boolean isReady) { isReady_.set(isReady); } - public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java deleted file mode 100644 index 88bab5e..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java +++ /dev/null @@ -1,134 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.hive.metastore.IMetaStoreClient; - -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TErrorCode; -import com.cloudera.impala.thrift.TStatus; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TTableDescriptor; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - -/** - * Represents a table with incomplete metadata. The metadata may be incomplete because - * it has not yet been loaded or because of errors encountered during the loading - * process. - */ -public class IncompleteTable extends Table { - // The cause for the incomplete metadata. If there is no cause given (cause_ = null), - // then this is assumed to be an uninitialized table (table that does not have - // its metadata loaded). - private ImpalaException cause_; - - private IncompleteTable(TableId id, Db db, String name, - ImpalaException cause) { - super(id, null, db, name, null); - cause_ = cause; - } - - /** - * Returns the cause (ImpalaException) which led to this table's metadata being - * incomplete. - */ - public ImpalaException getCause() { return cause_; } - - /** - * See comment on cause_. - */ - @Override - public boolean isLoaded() { return cause_ != null; } - - @Override - public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } - - @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - throw new IllegalStateException(cause_); - } - - @Override - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { - if (cause_ instanceof TableLoadingException) { - throw (TableLoadingException) cause_; - } else { - throw new TableLoadingException("Table metadata incomplete: ", cause_); - } - } - - @Override - public TTable toThrift() { - TTable table = new TTable(db_.getName(), name_); - table.setId(id_.asInt()); - if (cause_ != null) { - table.setLoad_status(new TStatus(TErrorCode.INTERNAL_ERROR, - Lists.newArrayList(JniUtil.throwableToString(cause_), - JniUtil.throwableToStackTrace(cause_)))); - } - return table; - } - - @Override - protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { - if (thriftTable.isSetLoad_status()) { - // Since the load status is set, it indicates the table is incomplete due to - // an error loading the table metadata. The error message in the load status - // should provide details on why. By convention, the final error message should - // be the remote (Catalog Server) call stack. This shouldn't be displayed to the - // user under normal circumstances, but needs to be recorded somewhere so append - // it to the call stack of the local TableLoadingException created here. - // TODO: Provide a mechanism (query option?) to optionally allow returning more - // detailed errors (including the full call stack(s)) to the user. - List<String> errorMsgs = thriftTable.getLoad_status().getError_msgs(); - String callStackStr = "<None available>"; - if (errorMsgs.size() > 1) callStackStr = errorMsgs.remove(errorMsgs.size() - 1); - - String errorMsg = Joiner.on("\n").join(errorMsgs); - // The errorMsg will always be prefixed with "ExceptionClassName: ". Since we treat - // all errors as TableLoadingExceptions, the prefix "TableLoadingException" is - // redundant and can be stripped out. - errorMsg = errorMsg.replaceFirst("^TableLoadingException: ", ""); - TableLoadingException loadingException = new TableLoadingException(errorMsg); - List<StackTraceElement> stackTrace = - Lists.newArrayList(loadingException.getStackTrace()); - stackTrace.add(new StackTraceElement("========", - "<Remote stack trace on catalogd>: " + callStackStr, "", -1)); - loadingException.setStackTrace( - stackTrace.toArray(new StackTraceElement[stackTrace.size()])); - this.cause_ = loadingException; - } - } - - public static IncompleteTable createUninitializedTable(TableId id, Db db, - String name) { - return new IncompleteTable(id, db, name, null); - } - - public static IncompleteTable createFailedMetadataLoadTable(TableId id, Db db, - String name, ImpalaException e) { - return new IncompleteTable(id, db, name, e); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/KuduColumn.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/KuduColumn.java b/fe/src/main/java/com/cloudera/impala/catalog/KuduColumn.java deleted file mode 100644 index 5e272d8..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/KuduColumn.java +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import com.cloudera.impala.thrift.TColumn; - -/** - * Describes a Kudu column mapped to a Hive column (as described in the metastore). - * This class extends Column with Kudu-specific information about whether it is part of a primary - * key, and whether it is nullable. - */ -public class KuduColumn extends Column { - private final boolean isKey_; - private final boolean isNullable_; - - public KuduColumn(String name, boolean isKey, boolean isNullable, Type type, - String comment, int position) { - super(name, type, comment, position); - isKey_ = isKey; - isNullable_ = isNullable; - } - - public boolean isKey() { return isKey_; } - public boolean isNullable() { return isNullable_; } - - @Override - public TColumn toThrift() { - TColumn colDesc = new TColumn(name_, type_.toThrift()); - if (comment_ != null) colDesc.setComment(comment_); - colDesc.setCol_stats(getStats().toThrift()); - colDesc.setPosition(position_); - colDesc.setIs_kudu_column(true); - colDesc.setIs_key(isKey_); - colDesc.setIs_nullable(isNullable_); - return colDesc; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java deleted file mode 100644 index 71d897d..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java +++ /dev/null @@ -1,278 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.bind.DatatypeConverter; - -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.log4j.Logger; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.LocatedTablet; - -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TColumn; -import com.cloudera.impala.thrift.TKuduTable; -import com.cloudera.impala.thrift.TResultSet; -import com.cloudera.impala.thrift.TResultSetMetadata; -import com.cloudera.impala.thrift.TTable; -import com.cloudera.impala.thrift.TTableDescriptor; -import com.cloudera.impala.thrift.TTableType; -import com.cloudera.impala.util.KuduUtil; -import com.cloudera.impala.util.TResultRowBuilder; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Impala representation of a Kudu table. - * - * The Kudu-related metadata is stored in the Metastore table's table properties. - */ -public class KuduTable extends Table { - private static final Logger LOG = Logger.getLogger(Table.class); - - // Alias to the string key that identifies the storage handler for Kudu tables. - public static final String KEY_STORAGE_HANDLER = - hive_metastoreConstants.META_TABLE_STORAGE; - - // Key to access the table name from the table properties - public static final String KEY_TABLE_NAME = "kudu.table_name"; - - // Key to access the columns used to build the (composite) key of the table. - // The order of the keys is important. - public static final String KEY_KEY_COLUMNS = "kudu.key_columns"; - - // Key to access the master address from the table properties. Error handling for - // this string is done in the KuduClient library. - // TODO we should have something like KuduConfig.getDefaultConfig() - public static final String KEY_MASTER_ADDRESSES = "kudu.master_addresses"; - - // Kudu specific value for the storage handler table property keyed by - // KEY_STORAGE_HANDLER. - public static final String KUDU_STORAGE_HANDLER = - "com.cloudera.kudu.hive.KuduStorageHandler"; - - // Key to specify the number of tablet replicas. - // TODO(KUDU): Allow modification in alter table. - public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas"; - - public static final long KUDU_RPC_TIMEOUT_MS = 50000; - - // The name of the table in Kudu. - private String kuduTableName_; - - // Comma separated list of Kudu master hosts with optional ports. - private String kuduMasters_; - - // The set of columns that are key columns in Kudu. - private ImmutableList<String> kuduKeyColumnNames_; - - protected KuduTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, - Db db, String name, String owner) { - super(id, msTable, db, name, owner); - } - - public TKuduTable getKuduTable() { - TKuduTable tbl = new TKuduTable(); - tbl.setKey_columns(Preconditions.checkNotNull(kuduKeyColumnNames_)); - tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); - tbl.setTable_name(kuduTableName_); - return tbl; - } - - @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE, - getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); - desc.setKuduTable(getKuduTable()); - return desc; - } - - @Override - public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } - - @Override - public String getStorageHandlerClassName() { return KUDU_STORAGE_HANDLER; } - - /** - * Returns the columns in the order they have been created - */ - @Override - public ArrayList<Column> getColumnsInHiveOrder() { return getColumns(); } - - public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table mstbl) { - return KUDU_STORAGE_HANDLER.equals(mstbl.getParameters().get(KEY_STORAGE_HANDLER)); - } - - /** - * Load the columns from the schema list - */ - private void loadColumns(List<FieldSchema> schema, IMetaStoreClient client, - Set<String> keyColumns) throws TableLoadingException { - - if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) { - throw new TableLoadingException(String.format("Kudu tables must have at least one" - + "key column (had %d), and no more key columns than there are table columns " - + "(had %d).", keyColumns.size(), schema.size())); - } - - clearColumns(); - Set<String> columnNames = Sets.newHashSet(); - int pos = 0; - for (FieldSchema field: schema) { - com.cloudera.impala.catalog.Type type = parseColumnType(field); - // TODO(kudu-merge): Check for decimal types? - boolean isKey = keyColumns.contains(field.getName()); - KuduColumn col = new KuduColumn(field.getName(), isKey, !isKey, type, - field.getComment(), pos); - columnNames.add(col.getName()); - addColumn(col); - ++pos; - } - - if (!columnNames.containsAll(keyColumns)) { - throw new TableLoadingException(String.format("Some key columns were not found in" - + " the set of columns. List of column names: %s, List of key column names:" - + " %s", Iterables.toString(columnNames), Iterables.toString(keyColumns))); - } - - kuduKeyColumnNames_ = ImmutableList.copyOf(keyColumns); - - loadAllColumnStats(client); - } - - @Override - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { - // TODO handle 'reuseMetadata' - if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) { - throw new TableLoadingException(String.format( - "Cannot load Kudu table %s, table is corrupt.", name_)); - } - - msTable_ = msTbl; - kuduTableName_ = msTbl.getParameters().get(KEY_TABLE_NAME); - kuduMasters_ = msTbl.getParameters().get(KEY_MASTER_ADDRESSES); - - String keyColumnsProp = Preconditions.checkNotNull(msTbl.getParameters() - .get(KEY_KEY_COLUMNS).toLowerCase(), "'kudu.key_columns' cannot be null."); - Set<String> keyColumns = KuduUtil.parseKeyColumns(keyColumnsProp); - - // Load the rest of the data from the table parameters directly - loadColumns(msTbl.getSd().getCols(), client, keyColumns); - - numClusteringCols_ = 0; - - // Get row count from stats - numRows_ = getRowCount(getMetaStoreTable().getParameters()); - } - - @Override - public TTable toThrift() { - TTable table = super.toThrift(); - table.setTable_type(TTableType.KUDU_TABLE); - table.setKudu_table(getKuduTable()); - return table; - } - - @Override - protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { - super.loadFromThrift(thriftTable); - TKuduTable tkudu = thriftTable.getKudu_table(); - kuduTableName_ = tkudu.getTable_name(); - kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses()); - kuduKeyColumnNames_ = ImmutableList.copyOf(tkudu.getKey_columns()); - } - - public String getKuduTableName() { return kuduTableName_; } - public String getKuduMasterAddresses() { return kuduMasters_; } - public int getNumKeyColumns() { return kuduKeyColumnNames_.size(); } - - /** - * Returns true if all required parameters are present in the given table properties - * map. - * TODO(kudu-merge) Return a more specific error string. - */ - public static boolean tableParamsAreValid(Map<String, String> params) { - return params.get(KEY_TABLE_NAME) != null && params.get(KEY_TABLE_NAME).length() > 0 - && params.get(KEY_MASTER_ADDRESSES) != null - && params.get(KEY_MASTER_ADDRESSES).length() > 0 - && params.get(KEY_KEY_COLUMNS) != null - && params.get(KEY_KEY_COLUMNS).length() > 0; - } - - /** - * The number of nodes is not know ahead of time and will be updated during computeStats - * in the scan node. - */ - public int getNumNodes() { return -1; } - - public List<String> getKuduKeyColumnNames() { return kuduKeyColumnNames_; } - - public TResultSet getTableStats() throws ImpalaRuntimeException { - TResultSet result = new TResultSet(); - TResultSetMetadata resultSchema = new TResultSetMetadata(); - result.setSchema(resultSchema); - - resultSchema.addToColumns(new TColumn("# Rows", Type.INT.toThrift())); - resultSchema.addToColumns(new TColumn("Start Key", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Stop Key", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); - - try (KuduClient client = new KuduClient.KuduClientBuilder( - getKuduMasterAddresses()).build()) { - org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); - List<LocatedTablet> tablets = - kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS); - for (LocatedTablet tab: tablets) { - TResultRowBuilder builder = new TResultRowBuilder(); - builder.add("-1"); // The Kudu client API doesn't expose tablet row counts. - builder.add(DatatypeConverter.printHexBinary( - tab.getPartition().getPartitionKeyStart())); - builder.add(DatatypeConverter.printHexBinary( - tab.getPartition().getPartitionKeyEnd())); - LocatedTablet.Replica leader = tab.getLeaderReplica(); - if (leader == null) { - // Leader might be null, if it is not yet available (e.g. during - // leader election in Kudu) - builder.add("Leader n/a"); - } else { - builder.add(leader.getRpcHost() + ":" + leader.getRpcPort().toString()); - } - builder.add(tab.getReplicas().size()); - result.addToRows(builder.get()); - } - - } catch (Exception e) { - throw new ImpalaRuntimeException("Could not communicate with Kudu.", e); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/MapType.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/MapType.java b/fe/src/main/java/com/cloudera/impala/catalog/MapType.java deleted file mode 100644 index 8bc7b05..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/MapType.java +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import org.apache.commons.lang3.StringUtils; - -import com.cloudera.impala.thrift.TColumnType; -import com.cloudera.impala.thrift.TTypeNode; -import com.cloudera.impala.thrift.TTypeNodeType; -import com.google.common.base.Preconditions; - -/** - * Describes a MAP type. MAP types have a scalar key and an arbitrarily-typed value. - */ -public class MapType extends Type { - private final Type keyType_; - private final Type valueType_; - - public MapType(Type keyType, Type valueType) { - Preconditions.checkNotNull(keyType); - Preconditions.checkNotNull(valueType); - keyType_ = keyType; - valueType_ = valueType; - } - - public Type getKeyType() { return keyType_; } - public Type getValueType() { return valueType_; } - - @Override - public boolean equals(Object other) { - if (!(other instanceof MapType)) return false; - MapType otherMapType = (MapType) other; - return otherMapType.keyType_.equals(keyType_) && - otherMapType.valueType_.equals(valueType_); - } - - @Override - public String toSql(int depth) { - if (depth >= MAX_NESTING_DEPTH) return "MAP<...>"; - return String.format("MAP<%s,%s>", - keyType_.toSql(depth + 1), valueType_.toSql(depth + 1)); - } - - @Override - protected String prettyPrint(int lpad) { - String leftPadding = StringUtils.repeat(' ', lpad); - if (valueType_.isScalarType()) return leftPadding + toSql(); - // Pass in the padding to make sure nested fields are aligned properly, - // even if we then strip the top-level padding. - String structStr = valueType_.prettyPrint(lpad); - structStr = structStr.substring(lpad); - return String.format("%sMAP<%s,%s>", leftPadding, keyType_.toSql(), structStr); - } - - @Override - public void toThrift(TColumnType container) { - TTypeNode node = new TTypeNode(); - container.types.add(node); - Preconditions.checkNotNull(keyType_); - Preconditions.checkNotNull(valueType_); - node.setType(TTypeNodeType.MAP); - keyType_.toThrift(container); - valueType_.toThrift(container); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java b/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java deleted file mode 100644 index 40eb4cf..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/MetaStoreClientPool.java +++ /dev/null @@ -1,189 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; -import org.apache.log4j.Logger; - -import com.google.common.base.Preconditions; - -/** - * Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty - * a new client is created and added to the pool. The idle pool can expand till a maximum - * size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed. - */ -public class MetaStoreClientPool { - // Key for config option read from hive-site.xml - private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF = - "impala.catalog.metastore.cnxn.creation.delay.ms"; - private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0; - // Maximum number of idle metastore connections in the connection pool at any point. - private static final int MAX_HMS_CONNECTION_POOL_SIZE = 32; - // Number of milliseconds to sleep between creation of HMS connections. Used to debug - // IMPALA-825. - private final int clientCreationDelayMs_; - - private static final Logger LOG = Logger.getLogger(MetaStoreClientPool.class); - - private final ConcurrentLinkedQueue<MetaStoreClient> clientPool_ = - new ConcurrentLinkedQueue<MetaStoreClient>(); - private Boolean poolClosed_ = false; - private final Object poolCloseLock_ = new Object(); - private final HiveConf hiveConf_; - - // Required for creating an instance of RetryingMetaStoreClient. - private static final HiveMetaHookLoader dummyHookLoader = new HiveMetaHookLoader() { - @Override - public HiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Table tbl) - throws MetaException { - return null; - } - }; - - /** - * A wrapper around the RetryingMetaStoreClient that manages interactions with the - * connection pool. This implements the AutoCloseable interface and hence the callers - * should use the try-with-resources statement while creating an instance. - */ - public class MetaStoreClient implements AutoCloseable { - private final IMetaStoreClient hiveClient_; - private boolean isInUse_; - - private MetaStoreClient(HiveConf hiveConf) { - try { - LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size()); - hiveClient_ = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader, - HiveMetaStoreClient.class.getName()); - } catch (Exception e) { - // Turn in to an unchecked exception - throw new IllegalStateException(e); - } - isInUse_ = false; - } - - /** - * Returns the internal RetryingMetaStoreClient object. - */ - public IMetaStoreClient getHiveClient() { - return hiveClient_; - } - - /** - * Returns this client back to the connection pool. If the connection pool has been - * closed, just close the Hive client connection. - */ - @Override - public void close() { - Preconditions.checkState(isInUse_); - isInUse_ = false; - // Ensure the connection isn't returned to the pool if the pool has been closed - // or if the number of connections in the pool exceeds MAX_HMS_CONNECTION_POOL_SIZE. - // This lock is needed to ensure proper behavior when a thread reads poolClosed - // is false, but a call to pool.close() comes in immediately afterward. - synchronized (poolCloseLock_) { - if (poolClosed_ || clientPool_.size() >= MAX_HMS_CONNECTION_POOL_SIZE) { - hiveClient_.close(); - } else { - clientPool_.offer(this); - } - } - } - - // Marks this client as in use - private void markInUse() { - Preconditions.checkState(!isInUse_); - isInUse_ = true; - } - } - - public MetaStoreClientPool(int initialSize) { - this(initialSize, new HiveConf(MetaStoreClientPool.class)); - } - - public MetaStoreClientPool(int initialSize, HiveConf hiveConf) { - hiveConf_ = hiveConf; - clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF, - DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF); - addClients(initialSize); - } - - /** - * Add numClients to the client pool. - */ - public void addClients(int numClients) { - for (int i = 0; i < numClients; ++i) { - clientPool_.add(new MetaStoreClient(hiveConf_)); - } - } - - /** - * Gets a client from the pool. If the pool is empty a new client is created. - */ - public MetaStoreClient getClient() { - // The MetaStoreClient c'tor relies on knowing the Hadoop version by asking - // org.apache.hadoop.util.VersionInfo. The VersionInfo class relies on opening - // the 'common-version-info.properties' file as a resource from hadoop-common*.jar - // using the Thread's context classloader. If necessary, set the Thread's context - // classloader, otherwise VersionInfo will fail in it's c'tor. - if (Thread.currentThread().getContextClassLoader() == null) { - Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - } - - MetaStoreClient client = clientPool_.poll(); - // The pool was empty so create a new client and return that. - // Serialize client creation to defend against possible race conditions accessing - // local Kerberos state (see IMPALA-825). - if (client == null) { - synchronized (this) { - try { - Thread.sleep(clientCreationDelayMs_); - } catch (InterruptedException e) { - /* ignore */ - } - client = new MetaStoreClient(hiveConf_); - } - } - client.markInUse(); - return client; - } - - /** - * Removes all items from the connection pool and closes all Hive Meta Store client - * connections. Can be called multiple times. - */ - public void close() { - // Ensure no more items get added to the pool once close is called. - synchronized (poolCloseLock_) { - if (poolClosed_) { return; } - poolClosed_ = true; - } - - MetaStoreClient client = null; - while ((client = clientPool_.poll()) != null) { - client.getHiveClient().close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/PartitionNotFoundException.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/PartitionNotFoundException.java b/fe/src/main/java/com/cloudera/impala/catalog/PartitionNotFoundException.java deleted file mode 100644 index 9964a3c..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/PartitionNotFoundException.java +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - - -/** - * Thrown when a partition cannot be found in the catalog. - */ -public class PartitionNotFoundException extends CatalogException { - // Dummy serial UID to avoid Eclipse warnings - private static final long serialVersionUID = -2203080667446640542L; - - public PartitionNotFoundException(String s) { super(s); } - - public PartitionNotFoundException(String s, Exception cause) { super(s, cause); } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/PartitionStatsUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/PartitionStatsUtil.java b/fe/src/main/java/com/cloudera/impala/catalog/PartitionStatsUtil.java deleted file mode 100644 index 2f2022f..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/PartitionStatsUtil.java +++ /dev/null @@ -1,141 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import com.cloudera.impala.thrift.TPartitionStats; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.util.MetaStoreUtil; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.commons.codec.binary.Base64; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.TSerializer; -import org.apache.thrift.TException; -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * Handles serialising and deserialising intermediate statistics from the Hive MetaStore - * via the parameters map attached to every Hive partition object. - */ -public class PartitionStatsUtil { - public static final String INCREMENTAL_STATS_NUM_CHUNKS = - "impala_intermediate_stats_num_chunks"; - - public static final String INCREMENTAL_STATS_CHUNK_PREFIX = - "impala_intermediate_stats_chunk"; - - private final static Logger LOG = LoggerFactory.getLogger(PartitionStatsUtil.class); - - /** - * Reconstructs a TPartitionStats object from its serialised form in the given parameter - * map. Returns null if no stats are serialised, and throws an exception if there was an - * error during deserialisation. - */ - public static TPartitionStats partStatsFromParameters( - Map<String, String> hmsParameters) throws ImpalaException { - if (hmsParameters == null) return null; - String numChunksStr = hmsParameters.get(INCREMENTAL_STATS_NUM_CHUNKS); - if (numChunksStr == null) return null; - int numChunks = Integer.parseInt(numChunksStr); - if (numChunks == 0) return null; - - Preconditions.checkState(numChunks >= 0); - StringBuilder encodedStats = new StringBuilder(); - for (int i = 0; i < numChunks; ++i) { - String chunk = hmsParameters.get(INCREMENTAL_STATS_CHUNK_PREFIX + i); - if (chunk == null) { - throw new ImpalaRuntimeException("Missing stats chunk: " + i); - } - encodedStats.append(chunk); - } - - byte[] decodedStats = Base64.decodeBase64(encodedStats.toString()); - TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory(); - TPartitionStats ret = new TPartitionStats(); - JniUtil.deserializeThrift(protocolFactory, ret, decodedStats); - return ret; - } - - /** - * Serialises a TPartitionStats object to a partition. - */ - public static void partStatsToParameters(TPartitionStats partStats, - HdfsPartition partition) { - // null stats means logically delete the stats from this partition - if (partStats == null) { - deletePartStats(partition); - return; - } - - // The HMS has a 4k (as of CDH5.2) limit on the length of any parameter string. The - // serialised version of the partition stats is often larger than this. Therefore, we - // naively 'chunk' the byte string into 4k pieces, and store the number of pieces in a - // separate parameter field. - // - // The object itself is first serialised by Thrift, and then base-64 encoded to be a - // valid string. This inflates its length somewhat; we may want to consider a - // different scheme or at least understand why this scheme doesn't seem much more - // effective than an ASCII representation. - try { - TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory(); - TSerializer serializer = new TSerializer(protocolFactory); - byte[] serialized = serializer.serialize(partStats); - String base64 = new String(Base64.encodeBase64(serialized)); - List<String> chunks = - chunkStringForHms(base64, MetaStoreUtil.MAX_PROPERTY_VALUE_LENGTH); - partition.putToParameters( - INCREMENTAL_STATS_NUM_CHUNKS, Integer.toString(chunks.size())); - for (int i = 0; i < chunks.size(); ++i) { - partition.putToParameters(INCREMENTAL_STATS_CHUNK_PREFIX + i, chunks.get(i)); - } - } catch (TException e) { - LOG.info("Error saving partition stats: ", e); - // TODO: What to throw here? - } - } - - public static void deletePartStats(HdfsPartition partition) { - partition.putToParameters(INCREMENTAL_STATS_NUM_CHUNKS, "0"); - for (Iterator<String> it = partition.getParameters().keySet().iterator(); - it.hasNext(); ) { - if (it.next().startsWith(INCREMENTAL_STATS_CHUNK_PREFIX)) { - it.remove(); - } - } - } - - static private List<String> chunkStringForHms(String data, int chunkLen) { - int idx = 0; - List<String> ret = Lists.newArrayList(); - while (idx < data.length()) { - int remaining = data.length() - idx; - int chunkSize = (chunkLen > remaining) ? remaining : chunkLen; - ret.add(data.substring(idx, idx + chunkSize)); - idx += chunkSize; - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/PrimitiveType.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/PrimitiveType.java b/fe/src/main/java/com/cloudera/impala/catalog/PrimitiveType.java deleted file mode 100644 index 4344b61..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/PrimitiveType.java +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; - -import com.cloudera.impala.thrift.TPrimitiveType; -import com.google.common.collect.Lists; - -public enum PrimitiveType { - INVALID_TYPE("INVALID_TYPE", -1, TPrimitiveType.INVALID_TYPE), - // NULL_TYPE - used only in LiteralPredicate and NullLiteral to make NULLs compatible - // with all other types. - NULL_TYPE("NULL_TYPE", 1, TPrimitiveType.NULL_TYPE), - BOOLEAN("BOOLEAN", 1, TPrimitiveType.BOOLEAN), - TINYINT("TINYINT", 1, TPrimitiveType.TINYINT), - SMALLINT("SMALLINT", 2, TPrimitiveType.SMALLINT), - INT("INT", 4, TPrimitiveType.INT), - BIGINT("BIGINT", 8, TPrimitiveType.BIGINT), - FLOAT("FLOAT", 4, TPrimitiveType.FLOAT), - DOUBLE("DOUBLE", 8, TPrimitiveType.DOUBLE), - DATE("DATE", 4, TPrimitiveType.DATE), - DATETIME("DATETIME", 8, TPrimitiveType.DATETIME), - // The timestamp structure is 12 bytes, Aligning to 8 bytes makes it 16. - TIMESTAMP("TIMESTAMP", 16, TPrimitiveType.TIMESTAMP), - // 8-byte pointer and 4-byte length indicator (12 bytes total). - // Aligning to 8 bytes so 16 total. - STRING("STRING", 16, TPrimitiveType.STRING), - VARCHAR("VARCHAR", 16, TPrimitiveType.VARCHAR), - - // Unsupported scalar type. - BINARY("BINARY", -1, TPrimitiveType.BINARY), - - // For decimal at the highest precision, the BE uses 16 bytes. - DECIMAL("DECIMAL", 16, TPrimitiveType.DECIMAL), - - // Fixed length char array. - CHAR("CHAR", -1, TPrimitiveType.CHAR); - - private final String description_; - private final int slotSize_; // size of tuple slot for this type - private final TPrimitiveType thriftType_; - - private PrimitiveType(String description, int slotSize, TPrimitiveType thriftType) { - description_ = description; - slotSize_ = slotSize; - thriftType_ = thriftType; - } - - @Override - public String toString() { - return description_; - } - - public static PrimitiveType fromThrift(TPrimitiveType t) { - switch (t) { - case INVALID_TYPE: return INVALID_TYPE; - case NULL_TYPE: return NULL_TYPE; - case BOOLEAN: return BOOLEAN; - case TINYINT: return TINYINT; - case SMALLINT: return SMALLINT; - case INT: return INT; - case BIGINT: return BIGINT; - case FLOAT: return FLOAT; - case DOUBLE: return DOUBLE; - case STRING: return STRING; - case VARCHAR: return VARCHAR; - case TIMESTAMP: return TIMESTAMP; - case CHAR: return CHAR; - case DECIMAL: return DECIMAL; - case BINARY: return BINARY; - } - return INVALID_TYPE; - } - - public TPrimitiveType toThrift() { return thriftType_; } - - public static List<TPrimitiveType> toThrift(PrimitiveType[] types) { - List<TPrimitiveType> result = Lists.newArrayList(); - for (PrimitiveType t: types) { - result.add(t.toThrift()); - } - return result; - } - - public int getSlotSize() { return slotSize_; } - public static int getMaxSlotSize() { return DECIMAL.slotSize_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/Role.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Role.java b/fe/src/main/java/com/cloudera/impala/catalog/Role.java deleted file mode 100644 index 7ba1bac..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/Role.java +++ /dev/null @@ -1,144 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TRole; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Represents a role in an authorization policy. This class is thread safe. - */ -public class Role implements CatalogObject { - private final TRole role_; - // The last role ID assigned, starts at 0. - private static AtomicInteger roleId_ = new AtomicInteger(0); - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; - - private final CatalogObjectCache<RolePrivilege> rolePrivileges_ = - new CatalogObjectCache<RolePrivilege>(); - - public Role(String roleName, Set<String> grantGroups) { - role_ = new TRole(); - role_.setRole_name(roleName); - role_.setRole_id(roleId_.incrementAndGet()); - role_.setGrant_groups(Lists.newArrayList(grantGroups)); - } - - private Role(TRole role) { - role_ = role; - } - - /** - * Adds a privilege to the role. Returns true if the privilege was added successfully - * or false if there was a newer version of the privilege already added to the role. - */ - public boolean addPrivilege(RolePrivilege privilege) { - return rolePrivileges_.add(privilege); - } - - /** - * Returns all privileges for this role. If no privileges have been added to the role - * an empty list will be returned. - */ - public List<RolePrivilege> getPrivileges() { - return Lists.newArrayList(rolePrivileges_.getValues()); - } - - /** - * Returns all privilege names for this role, or an empty set of no privileges are - * granted to the role. - */ - public Set<String> getPrivilegeNames() { - return Sets.newHashSet(rolePrivileges_.keySet()); - } - - /** - * Gets a privilege with the given name from this role. If no privilege exists - * with this name null is returned. - */ - public RolePrivilege getPrivilege(String privilegeName) { - return rolePrivileges_.get(privilegeName); - } - - /** - * Removes a privilege with the given name from the role. Returns the removed - * privilege or null if no privilege exists with this name. - */ - public RolePrivilege removePrivilege(String privilegeName) { - return rolePrivileges_.remove(privilegeName); - } - - /** - * Adds a new grant group to this role. - */ - public synchronized void addGrantGroup(String groupName) { - if (role_.getGrant_groups().contains(groupName)) return; - role_.addToGrant_groups(groupName); - } - - /** - * Removes a grant group from this role. - */ - public synchronized void removeGrantGroup(String groupName) { - role_.getGrant_groups().remove(groupName); - // Should never have duplicates in the list of groups. - Preconditions.checkState(!role_.getGrant_groups().contains(groupName)); - } - - /** - * Returns the Thrift representation of the role. - */ - public TRole toThrift() { - return role_; - } - - /** - * Creates a Role from a TRole thrift struct. - */ - public static Role fromThrift(TRole thriftRole) { - return new Role(thriftRole); - } - - /** - * Gets the set of group names that have been granted this role or an empty - * Set if no groups have been granted the role. - */ - public Set<String> getGrantGroups() { - return Sets.newHashSet(role_.getGrant_groups()); - } - @Override - public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.ROLE; } - @Override - public String getName() { return role_.getRole_name(); } - public int getId() { return role_.getRole_id(); } - @Override - public synchronized long getCatalogVersion() { return catalogVersion_; } - @Override - public synchronized void setCatalogVersion(long newVersion) { - catalogVersion_ = newVersion; - } - @Override - public boolean isLoaded() { return true; } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/RolePrivilege.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/RolePrivilege.java b/fe/src/main/java/com/cloudera/impala/catalog/RolePrivilege.java deleted file mode 100644 index 37d91d8..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/RolePrivilege.java +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.List; - -import org.apache.log4j.Logger; - -import com.cloudera.impala.thrift.TCatalogObjectType; -import com.cloudera.impala.thrift.TPrivilege; -import com.cloudera.impala.thrift.TPrivilegeLevel; -import com.cloudera.impala.thrift.TPrivilegeScope; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Represents a privilege that has been granted to a role in an authorization policy. - * This class is thread safe. - */ -public class RolePrivilege implements CatalogObject { - private static final Logger LOG = Logger.getLogger(AuthorizationPolicy.class); - // These Joiners are used to build role names. For simplicity, the role name we - // use can also be sent to the Sentry library to perform authorization checks - // so we build them in the same format. - private static final Joiner AUTHORIZABLE_JOINER = Joiner.on("->"); - private static final Joiner KV_JOINER = Joiner.on("="); - - private final TPrivilege privilege_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; - - private RolePrivilege(TPrivilege privilege) { - privilege_ = privilege; - } - - public TPrivilege toThrift() { return privilege_; } - public static RolePrivilege fromThrift(TPrivilege privilege) { - return new RolePrivilege(privilege); - } - - /** - * Builds a privilege name for the given TPrivilege object. For simplicity, this name is - * generated in a format that can be sent to the Sentry client to perform authorization - * checks. - */ - public static String buildRolePrivilegeName(TPrivilege privilege) { - List<String> authorizable = Lists.newArrayListWithExpectedSize(4); - try { - Preconditions.checkNotNull(privilege); - TPrivilegeScope scope = privilege.getScope(); - Preconditions.checkNotNull(scope); - switch (scope) { - case SERVER: { - authorizable.add(KV_JOINER.join("server", privilege.getServer_name())); - break; - } - case URI: { - authorizable.add(KV_JOINER.join("server", privilege.getServer_name())); - authorizable.add(KV_JOINER.join("uri", privilege.getUri())); - break; - } - case DATABASE: { - authorizable.add(KV_JOINER.join("server", privilege.getServer_name())); - authorizable.add(KV_JOINER.join("db", privilege.getDb_name())); - break; - } - case TABLE: { - authorizable.add(KV_JOINER.join("server", privilege.getServer_name())); - authorizable.add(KV_JOINER.join("db", privilege.getDb_name())); - authorizable.add(KV_JOINER.join("table", privilege.getTable_name())); - break; - } - case COLUMN: { - authorizable.add(KV_JOINER.join("server", privilege.getServer_name())); - authorizable.add(KV_JOINER.join("db", privilege.getDb_name())); - authorizable.add(KV_JOINER.join("table", privilege.getTable_name())); - authorizable.add(KV_JOINER.join("column", privilege.getColumn_name())); - break; - } - default: { - throw new UnsupportedOperationException( - "Unknown privilege scope: " + scope.toString()); - } - } - - // The ALL privilege is always implied and does not need to be included as part - // of the name. - if (privilege.getPrivilege_level() != TPrivilegeLevel.ALL) { - authorizable.add(KV_JOINER.join("action", - privilege.getPrivilege_level().toString())); - } - return AUTHORIZABLE_JOINER.join(authorizable); - } catch (Exception e) { - // Should never make it here unless the privilege is malformed. - LOG.error("ERROR: ", e); - return null; - } - } - - @Override - public TCatalogObjectType getCatalogObjectType() { - return TCatalogObjectType.PRIVILEGE; - } - @Override - public String getName() { return privilege_.getPrivilege_name(); } - public int getRoleId() { return privilege_.getRole_id(); } - @Override - public synchronized long getCatalogVersion() { return catalogVersion_; } - @Override - public synchronized void setCatalogVersion(long newVersion) { - catalogVersion_ = newVersion; - } - @Override - public boolean isLoaded() { return true; } - - // The time this role was created. Used to quickly check if the same privilege - // was dropped and re-created. Assumes a role will not be created + dropped + created - // in less than 1ms. Returns -1 if create_time_ms was not set for the privilege. - public long getCreateTimeMs() { - return privilege_.isSetCreate_time_ms() ? privilege_.getCreate_time_ms() : -1L; - } - public TPrivilegeScope getScope() { return privilege_.getScope(); } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/RowFormat.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/RowFormat.java b/fe/src/main/java/com/cloudera/impala/catalog/RowFormat.java deleted file mode 100644 index 76dcacb..0000000 --- a/fe/src/main/java/com/cloudera/impala/catalog/RowFormat.java +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.catalog; - -import java.util.Map; - -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; - -import com.cloudera.impala.analysis.StringLiteral; -import com.cloudera.impala.thrift.TTableRowFormat; -import com.google.common.base.Preconditions; - -/** - * Defines the physical (on-disk) format for a table's data. This is used when creating - * a new table to specify how to interpret the fields (columns) and lines (rows) in a - * a data file. - */ -public class RowFormat { - // Default row format - public final static RowFormat DEFAULT_ROW_FORMAT = new RowFormat(null, null, null); - - private final String fieldDelimiter_; - private final String lineDelimiter_; - private final String escapeChar_; - - private RowFormat(String fieldDelimiter, String lineDelimiter, String escapeChar, - boolean unescape) { - if (unescape) { - fieldDelimiter_ = getUnescapedValueOrNull(fieldDelimiter); - lineDelimiter_ = getUnescapedValueOrNull(lineDelimiter); - escapeChar_ = getUnescapedValueOrNull(escapeChar); - } else { - fieldDelimiter_ = fieldDelimiter; - lineDelimiter_ = lineDelimiter; - escapeChar_ = escapeChar; - } - } - - /** - * Creates a new instance of the RowFormat class, unescaping the values of - * field delimiter, line delimiter, and escape char. - */ - public RowFormat(String fieldDelimiter, String lineDelimiter, String escapeChar) { - this(fieldDelimiter, lineDelimiter, escapeChar, true); - } - - public String getFieldDelimiter() { - return fieldDelimiter_; - } - - public String getLineDelimiter() { - return lineDelimiter_; - } - - public String getEscapeChar() { - return escapeChar_; - } - - public boolean isDefault() { - return fieldDelimiter_ == null && lineDelimiter_ == null && escapeChar_ == null; - } - - private static String getUnescapedValueOrNull(String value) { - return value == null ? null : new StringLiteral(value).getUnescapedValue(); - } - - public TTableRowFormat toThrift() { - TTableRowFormat tableRowFormat = new TTableRowFormat(); - tableRowFormat.setField_terminator(getFieldDelimiter()); - tableRowFormat.setLine_terminator(getLineDelimiter()); - tableRowFormat.setEscaped_by(getEscapeChar()); - return tableRowFormat; - } - - public static RowFormat fromThrift(TTableRowFormat tableRowFormat) { - if (tableRowFormat == null) { - return RowFormat.DEFAULT_ROW_FORMAT; - } - // When creating a RowFormat from thrift, don't unescape the values, they should have - // already been unescaped. - return new RowFormat(tableRowFormat.getField_terminator(), - tableRowFormat.getLine_terminator(), tableRowFormat.getEscaped_by(), false); - } - - /** - * Returns the RowFormat for the storage descriptor. - */ - public static RowFormat fromStorageDescriptor(StorageDescriptor sd) { - Preconditions.checkNotNull(sd); - Map<String, String> params = sd.getSerdeInfo().getParameters(); - return new RowFormat(params.get("field.delim"), params.get("line.delim"), - params.get("escape.delim")); - } -}
