http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java index 27839b3..1a5e2ec 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -24,24 +25,35 @@ import java.util.TreeMap; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TTable; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; /** - * The impalad catalog cache can be modified by either a state store update or by a - * direct ("fast") update that applies the result of a catalog operation to the cache - * out-of-band of a state store update. This thread safe log tracks the divergence - * (due to direct updates to the cache) of this impalad's cache from the last state - * store update. This log is needed to ensure work is never undone. For example, - * consider the following sequence of events: - * t1: [Direct Update] - Add item A - (Catalog Version 9) - * t2: [Direct Update] - Drop item A - (Catalog Version 10) - * t3: [StateStore Update] - (From Catalog Version 9) - * This log is used to ensure the state store update in t3 does not undo the drop in t2. + * Represents a log of deleted catalog objects. * - * Currently this only tracks objects that were dropped, since the catalog cache can be - * queried to check if an object was added. TODO: Also track object additions from async - * operations. This could be used to to "replay" the log in the case of a catalog reset - * ("invalidate metadata"). Currently, the catalog may briefly go back in time if - * "invalidate metadata" is run concurrently with async catalog operations. + * There are currently two use cases for this log: + * + * a) Processing catalog updates in the impalads + * The impalad catalog cache can be modified by either a state store update or by a + * direct update that applies the result of a catalog operation to the cache + * out-of-band of a state store update. This thread safe log tracks the divergence + * (due to direct updates to the cache) of this impalad's cache from the last state + * store update. This log is needed to ensure work is never undone. For example, + * consider the following sequence of events: + * t1: [Direct Update] - Add item A - (Catalog Version 9) + * t2: [Direct Update] - Drop item A - (Catalog Version 10) + * t3: [StateStore Update] - (From Catalog Version 9) + * This log is used to ensure the state store update in t3 does not undo the drop in t2. + * Currently this only tracks objects that were dropped, since the catalog cache can be + * queried to check if an object was added. TODO: Also track object additions from async + * operations. This could be used to to "replay" the log in the case of a catalog reset + * ("invalidate metadata"). Currently, the catalog may briefly go back in time if + * "invalidate metadata" is run concurrently with async catalog operations. + * + * b) Building catalog topic updates in the catalogd + * The catalogd uses this log to identify deleted catalog objects that have been deleted + * since the last catalog topic update. Once the catalog topic update is constructed, + * the old entries in the log are garbage collected to prevent the log from growing + * indefinitely. */ public class CatalogDeltaLog { // Map of the catalog version an object was removed from the catalog @@ -58,6 +70,17 @@ public class CatalogDeltaLog { } /** + * Retrieve all the removed catalog objects with versions in range + * (fromVersion, toVersion]. + */ + public synchronized List<TCatalogObject> retrieveObjects(long fromVersion, + long toVersion) { + SortedMap<Long, TCatalogObject> objects = + removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1); + return ImmutableList.<TCatalogObject>copyOf(objects.values()); + } + + /** * Given the current catalog version, removes all items with catalogVersion < * currectCatalogVersion. Such objects do not need to be tracked in the delta * log anymore because they are consistent with the state store's view of the @@ -85,36 +108,8 @@ public class CatalogDeltaLog { SortedMap<Long, TCatalogObject> candidateObjects = removedCatalogObjects_.tailMap(catalogObject.getCatalog_version()); for (Map.Entry<Long, TCatalogObject> entry: candidateObjects.entrySet()) { - if (objectNamesMatch(catalogObject, entry.getValue())) return true; + if (Catalog.keyEquals(catalogObject, entry.getValue())) return true; } return false; } - - /** - * Returns true if the two objects have the same object type and name. - * TODO: Use global object IDs everywhere instead of tracking catalog objects by name. - */ - private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) { - if (first.getType() != second.getType()) return false; - switch (first.getType()) { - case DATABASE: - return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name()); - case TABLE: - case VIEW: - TTable firstTbl = first.getTable(); - return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) && - firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name()); - case FUNCTION: - return first.getFn().getSignature().equals(second.getFn().getSignature()) && - first.getFn().getName().equals(second.getFn().getName()); - case ROLE: - return first.getRole().getRole_name().equalsIgnoreCase( - second.getRole().getRole_name()); - case PRIVILEGE: - return first.getPrivilege().getPrivilege_name().equalsIgnoreCase( - second.getPrivilege().getPrivilege_name()) && - first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id(); - default: return false; - } - } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java index a2d8ca9..cc4c495 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java @@ -29,6 +29,9 @@ public interface CatalogObject { // Returns the unqualified object name. public String getName(); + // Returns the unique name of this catalog object. + public String getUniqueName(); + // Returns the version of this catalog object. public long getCatalogVersion(); @@ -37,4 +40,4 @@ public interface CatalogObject { // Returns true if this CatalogObject has had its metadata loaded, false otherwise. public boolean isLoaded(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java index c578e41..d882cdb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java @@ -30,6 +30,8 @@ import com.google.common.collect.Lists; /** * Thread safe cache for storing CatalogObjects. Enforces that updates to existing * entries only get applied if the new/updated object has a larger catalog version. + * add() and remove() functions also update the entries of the global instance of + * CatalogObjectVersionQueue which keeps track of the catalog objects versions. */ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> { private final boolean caseInsensitiveKeys_; @@ -71,13 +73,19 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> String key = catalogObject.getName(); if (caseInsensitiveKeys_) key = key.toLowerCase(); T existingItem = metadataCache_.putIfAbsent(key, catalogObject); - if (existingItem == null) return true; + if (existingItem == null) { + CatalogObjectVersionQueue.INSTANCE.addVersion( + catalogObject.getCatalogVersion()); + return true; + } if (existingItem.getCatalogVersion() < catalogObject.getCatalogVersion()) { // When existingItem != null it indicates there was already an existing entry // associated with the key. Add the updated object iff it has a catalog // version greater than the existing entry. metadataCache_.put(key, catalogObject); + CatalogObjectVersionQueue.INSTANCE.updateVersions( + existingItem.getCatalogVersion(), catalogObject.getCatalogVersion()); return true; } return false; @@ -89,7 +97,12 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> */ public synchronized T remove(String name) { if (caseInsensitiveKeys_) name = name.toLowerCase(); - return metadataCache_.remove(name); + T removedObject = metadataCache_.remove(name); + if (removedObject != null) { + CatalogObjectVersionQueue.INSTANCE.removeVersion( + removedObject.getCatalogVersion()); + } + return removedObject; } /** @@ -144,4 +157,4 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> public Iterator<T> iterator() { return metadataCache_.values().iterator(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java new file mode 100644 index 0000000..321355c --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.impala.common.NotImplementedException; +import org.apache.impala.thrift.TCatalogObjectType; + +abstract public class CatalogObjectImpl implements CatalogObject { + // Current catalog version of this object. Initialized to + // Catalog.INITIAL_CATALOG_VERSION. + private AtomicLong catalogVersion_ = new AtomicLong(Catalog.INITIAL_CATALOG_VERSION); + + protected CatalogObjectImpl() {} + + @Override + public long getCatalogVersion() { return catalogVersion_.get(); } + + @Override + public void setCatalogVersion(long newVersion) { catalogVersion_.set(newVersion); } + + @Override + public boolean isLoaded() { return true; } + + @Override + public String getName() { return ""; } + + @Override + public String getUniqueName() { return ""; } +} + http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java new file mode 100644 index 0000000..5fcd398 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import java.util.List; +import java.util.PriorityQueue; + +import com.google.common.base.Preconditions; + +/** + * Singleton class used to maintain the versions of all the catalog objects stored in a + * local catalog cache. Simple wrapper around a priority queue which stores the catalog + * object versions, allowing O(1) retrieval of the minimum object version currently + * stored in the cache. Provides a simple API to add, remove and update catalog object + * versions. Not thread-safe. + * + * The primary use case of this class is to allow an Impalad catalog cache determine when + * the result set of an INVALIDATE METADATA operation has been applied locally by keeping + * track of the minimum catalog object version. + */ +public class CatalogObjectVersionQueue { + private final PriorityQueue<Long> objectVersions_ = new PriorityQueue<>(); + + public static final CatalogObjectVersionQueue INSTANCE = + new CatalogObjectVersionQueue(); + + private CatalogObjectVersionQueue() {} + + public void updateVersions(long oldVersion, long newVersion) { + removeVersion(oldVersion); + addVersion(newVersion); + } + + public void removeVersion(long oldVersion) { + objectVersions_.remove(oldVersion); + } + + public void addVersion(long newVersion) { + objectVersions_.add(newVersion); + } + + public long getMinimumVersion() { + Long minVersion = objectVersions_.peek(); + return minVersion != null ? minVersion : 0; + } + + public void addAll(List<? extends CatalogObject> catalogObjects) { + for (CatalogObject catalogObject: catalogObjects) { + addVersion(catalogObject.getCatalogVersion()); + } + } + + public void removeAll(List<? extends CatalogObject> catalogObjects) { + for (CatalogObject catalogObject: catalogObjects) { + removeVersion(catalogObject.getCatalogVersion()); + } + } +} http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index d2a0a82..f75b0a8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -32,6 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.Path; @@ -45,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.exec.FunctionUtils; -import org.apache.impala.analysis.Analyzer; import org.apache.impala.authorization.SentryConfig; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; +import org.apache.impala.catalog.TopicUpdateLog.Entry; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; @@ -58,8 +60,9 @@ import org.apache.impala.hive.executor.UdfExecutor; import org.apache.impala.thrift.TCatalog; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; +import org.apache.impala.thrift.TCatalogUpdateResult; import org.apache.impala.thrift.TFunction; -import org.apache.impala.thrift.TGetAllCatalogObjectsResponse; +import org.apache.impala.thrift.TGetCatalogDeltaResponse; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TTable; @@ -73,6 +76,8 @@ import org.apache.thrift.protocol.TCompactProtocol; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -80,14 +85,62 @@ import com.google.common.collect.Sets; /** * Specialized Catalog that implements the CatalogService specific Catalog * APIs. The CatalogServiceCatalog manages loading of all the catalog metadata - * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog - * will return the catalog version that the update will show up in. The client - * can then wait until the statestore sends an update that contains that catalog - * version. - * The CatalogServiceCatalog also manages a global "catalog version". The version - * is incremented and assigned to a CatalogObject whenever it is - * added/modified/removed from the catalog. This means each CatalogObject will have a - * unique version and assigned versions are strictly increasing. + * and processing of DDL requests. The CatalogServiceCatalog maintains a global + * "catalog version". The version is incremented and assigned to a CatalogObject whenever + * it is added/modified/removed from the catalog. This means each CatalogObject will have + * a unique version and assigned versions are strictly increasing. + * + * Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a + * specified catalog version) and constructs a topic update to be sent to the statestore. + * Each catalog topic update is defined by a range of catalog versions (from, to] and the + * CatalogServiceCatalog guarantees that every catalog object that has a version in the + * specified range is included in the catalog topic update. Concurrent DDL requests are + * allowed while a topic update is in progress. Hence, there is a non-zero probability + * that frequently modified catalog objects may keep skipping topic updates. That can + * happen when by the time a topic update thread tries to collect an object update, that + * object is being modified by another metadata operation, causing its version to surpass + * the 'to' version of the topic update. To ensure that all catalog updates + * are eventually included in a catalog topic update, we keep track of the number of times + * each catalog object has skipped a topic update and if that number exceeds a specified + * threshold, we add the catalog object to the next topic update even if its version is + * higher than the 'to' version of the topic update. As a result, the same version of an + * object might be sent in two subsequent topic updates. + * + * The CatalogServiceCatalog maintains two logs: + * - Delete log. Since deleted objects are removed from the cache, the cache itself is + * not useful for tracking deletions. This log is used for populating the list of + * deleted objects during a topic update by recording the catalog objects that + * have been removed from the catalog. An entry with a new version is added to this log + * every time an object is removed (e.g. dropTable). Incrementing an object's version + * and adding it to the delete log should be performed atomically. An entry is removed + * from this log by the topic update thread when the associated deletion entry is + * added to a topic update. + * - Topic update log. This log records information about the catalog objects that have + * been included in a catalog topic update. Only the thread that is processing the + * topic update is responsible for adding, updating, and removing entries from the log. + * All other operations (e.g. addTable) only read topic update log entries but never + * modify them. Each entry includes the number of times a catalog object has + * skipped a topic update, which version of the object was last sent in a topic update + * and what was the version of that topic update. Entries of the topic update log are + * garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic + * update processing thread to prevent the log from growing indefinitely. Metadata + * operations using SYNC_DDL are inspecting this log to identify the catalog topic + * version that the issuing impalad must wait for in order to ensure that the effects + * of this operation have been broadcast to all the coordinators. + * + * Known anomalies with SYNC_DDL: + * The time-based cleanup process of the topic update log entries may cause metadata + * operations that use SYNC_DDL to hang while waiting for specific topic update log + * entries. That could happen if the thread processing the metadata operation stalls + * for a long period of time (longer than the time to process + * TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was + * applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce + * the probability of such an event, we set the value of the + * TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations + * from hanging in that path due to unknown issues (e.g. bugs), operations using + * SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an + * exception is thrown if the specified max wait time is exceeded. See + * waitForSyncDdlVersion() for more details. * * Table metadata for IncompleteTables (not fully loaded tables) are loaded in the * background by the TableLoadingMgr; tables can be prioritized for loading by calling @@ -100,7 +153,7 @@ import com.google.common.collect.Sets; * out-of-band of the table loading thread pool. * * See the class comments in CatalogOpExecutor for a description of the locking protocol - * that should be employed if both the catalog lock and table locks need to be held at + * that should be employed if both the version lock and table locks need to be held at * the same time. * * TODO: Consider removing on-demand loading and have everything go through the table @@ -110,6 +163,7 @@ public class CatalogServiceCatalog extends Catalog { private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class); private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10; + private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2; private final TUniqueId catalogServiceId_; // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock @@ -123,11 +177,11 @@ public class CatalogServiceCatalog extends Catalog { // from the metastore. // * During renameTable(), because a table must be removed and added to the catalog // atomically (potentially in a different database). - private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true); + private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true); // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented // with each update to the Catalog. Continued across the lifetime of the object. - // Protected by catalogLock_. + // Protected by versionLock_. // TODO: Handle overflow of catalogVersion_ and nextTableId_. // TODO: The name of this variable is misleading and can be interpreted as a property // of the catalog server. Rename into something that indicates its role as a global @@ -150,6 +204,19 @@ public class CatalogServiceCatalog extends Catalog { // Local temporary directory to copy UDF Jars. private static String localLibraryPath_; + // Log of deleted catalog objects. + private final CatalogDeltaLog deleteLog_; + + // Version of the last topic update returned to the statestore. + // The version of a topic update is the catalog version of the CATALOG object + // that is added to it. + private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1); + + // Wait time for a topic update. + private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000; + + private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog(); + /** * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in @@ -169,7 +236,7 @@ public class CatalogServiceCatalog extends Catalog { // local, etc.) if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) { cachePoolReader_.scheduleAtFixedRate( - new CachePoolReader(), 0, 1, TimeUnit.MINUTES); + new CachePoolReader(false), 0, 1, TimeUnit.MINUTES); } } catch (IOException e) { LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled."); @@ -180,6 +247,7 @@ public class CatalogServiceCatalog extends Catalog { sentryProxy_ = null; } localLibraryPath_ = new String("file://" + localLibraryPath); + deleteLog_ = new CatalogDeltaLog(); } // Timeout for acquiring a table lock @@ -189,7 +257,7 @@ public class CatalogServiceCatalog extends Catalog { private static final int TBL_LOCK_RETRY_MS = 10; /** - * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it + * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held * when the function returns. Returns false otherwise and no lock is held in this case. */ @@ -197,7 +265,7 @@ public class CatalogServiceCatalog extends Catalog { long begin = System.currentTimeMillis(); long end; do { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); if (tbl.getLock().tryLock()) { if (LOG.isTraceEnabled()) { end = System.currentTimeMillis(); @@ -206,7 +274,7 @@ public class CatalogServiceCatalog extends Catalog { } return true; } - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); try { // Sleep to avoid spinning and allow other operations to make progress. Thread.sleep(TBL_LOCK_RETRY_MS); @@ -223,12 +291,17 @@ public class CatalogServiceCatalog extends Catalog { * Called periodically by the cachePoolReader_. */ protected class CachePoolReader implements Runnable { - + // If true, existing cache pools will get a new catalog version and, consequently, + // they will be added to the next topic update, triggering an update in each + // coordinator's local catalog cache. This is needed for the case of INVALIDATE + // METADATA where a new catalog version needs to be assigned to every catalog object. + private final boolean incrementVersions_; /** * This constructor is needed to create a non-threaded execution of the class. */ - public CachePoolReader() { + public CachePoolReader(boolean incrementVersions) { super(); + incrementVersions_ = incrementVersions; } public void run() { @@ -249,28 +322,45 @@ public class CatalogServiceCatalog extends Catalog { return; } - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { // Determine what has changed relative to what we have cached. Set<String> droppedCachePoolNames = Sets.difference( hdfsCachePools_.keySet(), currentCachePools.keySet()); Set<String> createdCachePoolNames = Sets.difference( currentCachePools.keySet(), hdfsCachePools_.keySet()); + Set<String> survivingCachePoolNames = Sets.difference( + hdfsCachePools_.keySet(), droppedCachePoolNames); // Add all new cache pools. for (String createdCachePool: createdCachePoolNames) { HdfsCachePool cachePool = new HdfsCachePool( currentCachePools.get(createdCachePool)); - cachePool.setCatalogVersion( - CatalogServiceCatalog.this.incrementAndGetCatalogVersion()); + cachePool.setCatalogVersion(incrementAndGetCatalogVersion()); hdfsCachePools_.add(cachePool); } // Remove dropped cache pools. for (String cachePoolName: droppedCachePoolNames) { - hdfsCachePools_.remove(cachePoolName); - CatalogServiceCatalog.this.incrementAndGetCatalogVersion(); + HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName); + if (cachePool != null) { + cachePool.setCatalogVersion(incrementAndGetCatalogVersion()); + TCatalogObject removedObject = + new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, + cachePool.getCatalogVersion()); + removedObject.setCache_pool(cachePool.toThrift()); + deleteLog_.addRemovedObject(removedObject); + } + } + if (incrementVersions_) { + // Increment the version of existing pools in order to be added to the next + // topic update. + for (String survivingCachePoolName: survivingCachePoolNames) { + HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName); + Preconditions.checkNotNull(cachePool); + cachePool.setCatalogVersion(incrementAndGetCatalogVersion()); + } } } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } } @@ -297,120 +387,347 @@ public class CatalogServiceCatalog extends Catalog { } /** - * Returns all known objects in the Catalog (Tables, Views, Databases, and - * Functions). Some metadata may be skipped for objects that have a catalog - * version < the specified "fromVersion". Takes a lock on the catalog to ensure this - * update contains a consistent snapshot of all items in the catalog. While holding the - * catalog lock, it locks each accessed table to protect against concurrent - * modifications. + * Identifies and returns the catalog objects that were added/modified/deleted in the + * catalog with versions > 'fromVersion'. It operates on a snaphsot of the catalog + * without holding the catalog lock which means that other concurrent metadata + * operations can still make progress while the catalog delta is computed. An entry in + * the topic update log is added for every catalog object that is included in the + * catalog delta. The log is examined by operations using SYNC_DDL to determine which + * topic update covers the result set of metadata operation. Once the catalog delta is + * computed, the entries in the delete log with versions less than 'fromVersion' are + * garbage collected. + */ + public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) { + // Maximum catalog version (inclusive) to be included in the catalog delta. + long toVersion = getCatalogVersion(); + TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse(); + resp.setUpdated_objects(new ArrayList<TCatalogObject>()); + resp.setDeleted_objects(new ArrayList<TCatalogObject>()); + resp.setMax_catalog_version(toVersion); + + for (Db db: getAllDbs()) { + addDatabaseToCatalogDelta(db, fromVersion, toVersion, resp); + } + for (DataSource dataSource: getAllDataSources()) { + addDataSourceToCatalogDelta(dataSource, fromVersion, toVersion, resp); + } + for (HdfsCachePool cachePool: getAllHdfsCachePools()) { + addHdfsCachePoolToCatalogDelta(cachePool, fromVersion, toVersion, resp); + } + for (Role role: getAllRoles()) { + addRoleToCatalogDelta(role, fromVersion, toVersion, resp); + } + Set<String> updatedCatalogObjects = Sets.newHashSet(); + for (TCatalogObject catalogObj: resp.updated_objects) { + topicUpdateLog_.add(Catalog.toCatalogObjectKey(catalogObj), + new TopicUpdateLog.Entry(0, catalogObj.getCatalog_version(), + toVersion)); + updatedCatalogObjects.add(Catalog.toCatalogObjectKey(catalogObj)); + } + + // Identify the catalog objects that were removed from the catalog for which their + // versions are in range ('fromVersion', 'toVersion']. We need to make sure + // that we don't include "deleted" objects that were re-added to the catalog. + for (TCatalogObject removedObject: getDeletedObjects(fromVersion, toVersion)) { + if (!updatedCatalogObjects.contains( + Catalog.toCatalogObjectKey(removedObject))) { + topicUpdateLog_.add(Catalog.toCatalogObjectKey(removedObject), + new TopicUpdateLog.Entry(0, removedObject.getCatalog_version(), + toVersion)); + resp.addToDeleted_objects(removedObject); + } + } + // Each topic update should contain a single "TCatalog" object which is used to + // pass overall state on the catalog, such as the current version and the + // catalog service id. By setting the catalog version to the latest catalog + // version at this point, it ensures impalads will always bump their versions, + // even in the case where an object has been dropped. + TCatalogObject catalog = + new TCatalogObject(TCatalogObjectType.CATALOG, toVersion); + catalog.setCatalog(new TCatalog(catalogServiceId_)); + resp.addToUpdated_objects(catalog); + // Garbage collect the delete and topic update log. + deleteLog_.garbageCollect(toVersion); + topicUpdateLog_.garbageCollectUpdateLogEntries(toVersion); + lastSentTopicUpdate_.set(toVersion); + // Notify any operation that is waiting on the next topic update. + synchronized (topicUpdateLog_) { + topicUpdateLog_.notifyAll(); + } + return resp; + } + + /** + * Get a snapshot view of all the catalog objects that were deleted between versions + * ('fromVersion', 'toVersion']. */ - public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) { - TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse(); - resp.setObjects(new ArrayList<TCatalogObject>()); - resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION); - catalogLock_.readLock().lock(); + private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) { + versionLock_.readLock().lock(); try { - for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) { - TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE, - db.getCatalogVersion()); - catalogDb.setDb(db.toThrift()); - resp.addToObjects(catalogDb); - - for (String tblName: db.getAllTableNames()) { - TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE, - Catalog.INITIAL_CATALOG_VERSION); - - Table tbl = db.getTable(tblName); - if (tbl == null) { - LOG.error("Table: " + tblName + " was expected to be in the catalog " + - "cache. Skipping table for this update."); - continue; - } + return deleteLog_.retrieveObjects(fromVersion, toVersion); + } finally { + versionLock_.readLock().unlock(); + } + } - // Protect the table from concurrent modifications. - tbl.getLock().lock(); - try { - // Only add the extended metadata if this table's version is >= - // the fromVersion. - if (tbl.getCatalogVersion() >= fromVersion) { - try { - catalogTbl.setTable(tbl.toThrift()); - } catch (Exception e) { - if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s", - db.getName(), tblName, e.getMessage()), e); - } - continue; - } - catalogTbl.setCatalog_version(tbl.getCatalogVersion()); - } else { - catalogTbl.setTable(new TTable(db.getName(), tblName)); - } - } finally { - tbl.getLock().unlock(); - } - resp.addToObjects(catalogTbl); - } + /** + * Get a snapshot view of all the databases in the catalog. + */ + private List<Db> getAllDbs() { + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(dbCache_.get().values()); + } finally { + versionLock_.readLock().unlock(); + } + } - for (Function fn: db.getFunctions(null, new PatternMatcher())) { - TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION, - fn.getCatalogVersion()); - function.setFn(fn.toThrift()); - resp.addToObjects(function); - } + /** + * Get a snapshot view of all the data sources in the catalog. + */ + private List<DataSource> getAllDataSources() { + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(getDataSources()); + } finally { + versionLock_.readLock().unlock(); + } + } + + /** + * Get a snapshot view of all the Hdfs cache pools in the catalog. + */ + private List<HdfsCachePool> getAllHdfsCachePools() { + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(hdfsCachePools_); + } finally { + versionLock_.readLock().unlock(); + } + } + + /** + * Get a snapshot view of all the roles in the catalog. + */ + private List<Role> getAllRoles() { + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(authPolicy_.getAllRoles()); + } finally { + versionLock_.readLock().unlock(); + } + } + + /** + * Adds a database in the topic update if its version is in the range + * ('fromVersion', 'toVersion']. It iterates through all the tables and functions of + * this database to determine if they can be included in the topic update. + */ + private void addDatabaseToCatalogDelta(Db db, long fromVersion, long toVersion, + TGetCatalogDeltaResponse resp) { + long dbVersion = db.getCatalogVersion(); + if (dbVersion > fromVersion && dbVersion <= toVersion) { + TCatalogObject catalogDb = + new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion); + catalogDb.setDb(db.toThrift()); + resp.addToUpdated_objects(catalogDb); + } + for (Table tbl: getAllTables(db)) { + addTableToCatalogDelta(tbl, fromVersion, toVersion, resp); + } + for (Function fn: getAllFunctions(db)) { + addFunctionToCatalogDelta(fn, fromVersion, toVersion, resp); + } + } + + /** + * Get a snapshot view of all the tables in a database. + */ + private List<Table> getAllTables(Db db) { + Preconditions.checkNotNull(db); + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(db.getTables()); + } finally { + versionLock_.readLock().unlock(); + } + } + + /** + * Get a snapshot view of all the functions in a database. + */ + private List<Function> getAllFunctions(Db db) { + Preconditions.checkNotNull(db); + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher())); + } finally { + versionLock_.readLock().unlock(); + } + } + + /** + * Adds a table in the topic update if its version is in the range + * ('fromVersion', 'toVersion']. If the table's version is larger than 'toVersion' and + * the table has skipped a topic update 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is + * included in the topic update. This prevents tables that are updated frequently from + * skipping topic updates indefinitely, which would also violate the semantics of + * SYNC_DDL. + */ + private void addTableToCatalogDelta(Table tbl, long fromVersion, long toVersion, + TGetCatalogDeltaResponse resp) { + if (tbl.getCatalogVersion() <= toVersion) { + addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp); + } else { + TopicUpdateLog.Entry topicUpdateEntry = + topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName()); + Preconditions.checkNotNull(topicUpdateEntry); + if (topicUpdateEntry.getNumSkippedTopicUpdates() >= MAX_NUM_SKIPPED_TOPIC_UPDATES) { + addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp); + } else { + LOG.info("Table " + tbl.getFullName() + " is skipping topic update " + + toVersion); + topicUpdateLog_.add(tbl.getUniqueName(), + new TopicUpdateLog.Entry( + topicUpdateEntry.getNumSkippedTopicUpdates() + 1, + topicUpdateEntry.getLastSentVersion(), + topicUpdateEntry.getLastSentCatalogUpdate())); } + } + } - for (DataSource dataSource: getDataSources()) { - TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE, - dataSource.getCatalogVersion()); - catalogObj.setData_source(dataSource.toThrift()); - resp.addToObjects(catalogObj); + /** + * Helper function that tries to add a table in a topic update. It acquires table's + * lock and checks if its version is in the ('fromVersion', 'toVersion'] range and how + * many consecutive times (if any) has the table skipped a topic update. + */ + private void addTableToCatalogDeltaHelper(Table tbl, long fromVersion, long toVersion, + TGetCatalogDeltaResponse resp) { + TCatalogObject catalogTbl = + new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION); + tbl.getLock().lock(); + try { + long tblVersion = tbl.getCatalogVersion(); + if (tblVersion <= fromVersion) return; + TopicUpdateLog.Entry topicUpdateEntry = + topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName()); + if (tblVersion > toVersion && + topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) { + LOG.info("Table " + tbl.getFullName() + " is skipping topic update " + + toVersion); + topicUpdateLog_.add(tbl.getUniqueName(), + new TopicUpdateLog.Entry( + topicUpdateEntry.getNumSkippedTopicUpdates() + 1, + topicUpdateEntry.getLastSentVersion(), + topicUpdateEntry.getLastSentCatalogUpdate())); + return; } - for (HdfsCachePool cachePool: hdfsCachePools_) { - TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, - cachePool.getCatalogVersion()); - pool.setCache_pool(cachePool.toThrift()); - resp.addToObjects(pool); + try { + catalogTbl.setTable(tbl.toThrift()); + } catch (Exception e) { + LOG.error(String.format("Error calling toThrift() on table %s: %s", + tbl.getFullName(), e.getMessage()), e); + return; } + catalogTbl.setCatalog_version(tbl.getCatalogVersion()); + resp.addToUpdated_objects(catalogTbl); + } finally { + tbl.getLock().unlock(); + } + } - // Get all roles - for (Role role: authPolicy_.getAllRoles()) { - TCatalogObject thriftRole = new TCatalogObject(); - thriftRole.setRole(role.toThrift()); - thriftRole.setCatalog_version(role.getCatalogVersion()); - thriftRole.setType(role.getCatalogObjectType()); - resp.addToObjects(thriftRole); - - for (RolePrivilege p: role.getPrivileges()) { - TCatalogObject privilege = new TCatalogObject(); - privilege.setPrivilege(p.toThrift()); - privilege.setCatalog_version(p.getCatalogVersion()); - privilege.setType(p.getCatalogObjectType()); - resp.addToObjects(privilege); - } - } + /** + * Adds a function to the topic update if its version is in the range + * ('fromVersion', 'toVersion']. + */ + private void addFunctionToCatalogDelta(Function fn, long fromVersion, long toVersion, + TGetCatalogDeltaResponse resp) { + long fnVersion = fn.getCatalogVersion(); + if (fnVersion <= fromVersion || fnVersion > toVersion) return; + TCatalogObject function = + new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion); + function.setFn(fn.toThrift()); + resp.addToUpdated_objects(function); + } - // Each update should contain a single "TCatalog" object which is used to - // pass overall state on the catalog, such as the current version and the - // catalog service id. - TCatalogObject catalog = new TCatalogObject(); - catalog.setType(TCatalogObjectType.CATALOG); - // By setting the catalog version to the latest catalog version at this point, - // it ensure impalads will always bump their versions, even in the case where - // an object has been dropped. - catalog.setCatalog_version(getCatalogVersion()); - catalog.setCatalog(new TCatalog(catalogServiceId_)); - resp.addToObjects(catalog); - - // The max version is the max catalog version of all items in the update. - resp.setMax_catalog_version(getCatalogVersion()); - return resp; + /** + * Adds a data source to the topic update if its version is in the range + * ('fromVersion', 'toVersion']. + */ + private void addDataSourceToCatalogDelta(DataSource dataSource, long fromVersion, + long toVersion, TGetCatalogDeltaResponse resp) { + long dsVersion = dataSource.getCatalogVersion(); + if (dsVersion <= fromVersion || dsVersion > toVersion) return; + TCatalogObject catalogObj = + new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion); + catalogObj.setData_source(dataSource.toThrift()); + resp.addToUpdated_objects(catalogObj); + } + + /** + * Adds a HDFS cache pool to the topic update if its version is in the range + * ('fromVersion', 'toVersion']. + */ + private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool, long fromVersion, + long toVersion, TGetCatalogDeltaResponse resp) { + long cpVersion = cachePool.getCatalogVersion(); + if (cpVersion <= fromVersion || cpVersion > toVersion) { + return; + } + TCatalogObject pool = + new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion); + pool.setCache_pool(cachePool.toThrift()); + resp.addToUpdated_objects(pool); + } + + + /** + * Adds a role to the topic update if its version is in the range + * ('fromVersion', 'toVersion']. It iterates through all the privileges of this role to + * determine if they can be inserted in the topic update. + */ + private void addRoleToCatalogDelta(Role role, long fromVersion, long toVersion, + TGetCatalogDeltaResponse resp) { + long roleVersion = role.getCatalogVersion(); + if (roleVersion > fromVersion && roleVersion <= toVersion) { + TCatalogObject thriftRole = + new TCatalogObject(TCatalogObjectType.ROLE, roleVersion); + thriftRole.setRole(role.toThrift()); + resp.addToUpdated_objects(thriftRole); + } + for (RolePrivilege p: getAllPrivileges(role)) { + addRolePrivilegeToCatalogDelta(p, fromVersion, toVersion, resp); + } + } + + /** + * Get a snapshot view of all the privileges in a role. + */ + private List<RolePrivilege> getAllPrivileges(Role role) { + Preconditions.checkNotNull(role); + versionLock_.readLock().lock(); + try { + return ImmutableList.copyOf(role.getPrivileges()); } finally { - catalogLock_.readLock().unlock(); + versionLock_.readLock().unlock(); } } /** + * Adds a role privilege to the topic update if its version is in the range + * ('fromVersion', 'toVersion']. + */ + private void addRolePrivilegeToCatalogDelta(RolePrivilege priv, long fromVersion, + long toVersion, TGetCatalogDeltaResponse resp) { + long privVersion = priv.getCatalogVersion(); + if (privVersion <= fromVersion || privVersion > toVersion) return; + TCatalogObject privilege = + new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion); + privilege.setPrivilege(priv.toThrift()); + resp.addToUpdated_objects(privilege); + } + + /** * Returns all user defined functions (aggregate and scalar) in the specified database. * Functions are not returned in a defined order. */ @@ -710,6 +1027,31 @@ public class CatalogServiceCatalog extends Catalog { tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase())); } } + + if (existingDb != null) { + // Identify any removed functions and add them to the delta log. + for (Map.Entry<String, List<Function>> e: + existingDb.getAllFunctions().entrySet()) { + for (Function fn: e.getValue()) { + if (newDb.getFunction(fn, + Function.CompareMode.IS_INDISTINGUISHABLE) == null) { + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(fn.toTCatalogObject()); + } + } + } + + // Identify any deleted tables and add them to the delta log + Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames()); + Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames()); + oldTableNames.removeAll(newTableNames); + for (String removedTableName: oldTableNames) { + Table removedTable = IncompleteTable.createUninitializedTable(existingDb, + removedTableName); + removedTable.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(removedTable.toTCatalogObject()); + } + } return Pair.create(newDb, tblsToBackgroundLoad); } catch (Exception e) { LOG.warn("Encountered an exception while invalidating database: " + dbName + @@ -720,22 +1062,35 @@ public class CatalogServiceCatalog extends Catalog { /** * Resets this catalog instance by clearing all cached table and database metadata. + * Returns the current catalog version before reset has taken any effect. The + * requesting impalad will use that version to determine when the + * effects of reset have been applied to its local catalog cache. */ - public void reset() throws CatalogException { - LOG.info("Invalidating all metadata."); - + public long reset() throws CatalogException { + long currentCatalogVersion = getCatalogVersion(); + LOG.info("Invalidating all metadata. Version: " + currentCatalogVersion); // First update the policy metadata. if (sentryProxy_ != null) { // Sentry Service is enabled. try { // Update the authorization policy, waiting for the result to complete. - sentryProxy_.refresh(); + sentryProxy_.refresh(true); } catch (Exception e) { throw new CatalogException("Error updating authorization policy: ", e); } } - catalogLock_.writeLock().lock(); + // Update the HDFS cache pools + CachePoolReader reader = new CachePoolReader(true); + reader.run(); + + versionLock_.writeLock().lock(); + // Assign new versions to all the loaded data sources. + for (DataSource dataSource: getDataSources()) { + dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); + } + + // Update db and table metadata try { // Not all Java UDFs are persisted to the metastore. The ones which aren't // should be restored once the catalog has been invalidated. @@ -757,6 +1112,16 @@ public class CatalogServiceCatalog extends Catalog { } } dbCache_.set(newDbCache); + + // Identify any deleted databases and add them to the delta log. + Set<String> oldDbNames = oldDbCache.keySet(); + Set<String> newDbNames = newDbCache.keySet(); + oldDbNames.removeAll(newDbNames); + for (String dbName: oldDbNames) { + Db removedDb = oldDbCache.get(dbName); + updateDeleteLog(removedDb); + } + // Submit tables for background loading. for (TTableName tblName: tblsToBackgroundLoad) { tableLoadingMgr_.backgroundLoad(tblName); @@ -765,21 +1130,26 @@ public class CatalogServiceCatalog extends Catalog { LOG.error(e); throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e); } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } LOG.info("Invalidated all metadata."); + return currentCatalogVersion; } /** * Adds a database name to the metadata cache and returns the database's * new Db object. Used by CREATE DATABASE statements. */ - public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) - throws ImpalaException { + public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) { Db newDb = new Db(dbName, this, msDb); - newDb.setCatalogVersion(incrementAndGetCatalogVersion()); - addDb(newDb); - return newDb; + versionLock_.writeLock().lock(); + try { + newDb.setCatalogVersion(incrementAndGetCatalogVersion()); + addDb(newDb); + return newDb; + } finally { + versionLock_.writeLock().unlock(); + } } /** @@ -789,11 +1159,36 @@ public class CatalogServiceCatalog extends Catalog { */ @Override public Db removeDb(String dbName) { - Db removedDb = super.removeDb(dbName); - if (removedDb != null) { - removedDb.setCatalogVersion(incrementAndGetCatalogVersion()); + versionLock_.writeLock().lock(); + try { + Db removedDb = super.removeDb(dbName); + if (removedDb != null) updateDeleteLog(removedDb); + return removedDb; + } finally { + versionLock_.writeLock().unlock(); + } + } + + /** + * Helper function to clean up the state associated with a removed database. It creates + * the entries in the delete log for 'db' as well as for its tables and functions + * (if any). + */ + private void updateDeleteLog(Db db) { + Preconditions.checkNotNull(db); + Preconditions.checkState(versionLock_.isWriteLockedByCurrentThread()); + if (!db.isSystemDb()) { + for (Table tbl: db.getTables()) { + tbl.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(tbl.toMinimalTCatalogObject()); + } + for (Function fn: db.getFunctions(null, new PatternMatcher())) { + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(fn.toTCatalogObject()); + } } - return removedDb; + db.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(db.toTCatalogObject()); } /** @@ -804,8 +1199,13 @@ public class CatalogServiceCatalog extends Catalog { Db db = getDb(dbName); if (db == null) return null; Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName); - incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(incompleteTable); + versionLock_.writeLock().lock(); + try { + incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); + db.addTable(incompleteTable); + } finally { + versionLock_.writeLock().unlock(); + } return db.getTable(tblName); } @@ -825,14 +1225,14 @@ public class CatalogServiceCatalog extends Catalog { long previousCatalogVersion; // Return the table if it is already loaded or submit a new load request. - catalogLock_.readLock().lock(); + versionLock_.readLock().lock(); try { Table tbl = getTable(dbName, tblName); if (tbl == null || tbl.isLoaded()) return tbl; previousCatalogVersion = tbl.getCatalogVersion(); loadReq = tableLoadingMgr_.loadAsync(tableName); } finally { - catalogLock_.readLock().unlock(); + versionLock_.readLock().unlock(); } Preconditions.checkNotNull(loadReq); try { @@ -850,7 +1250,7 @@ public class CatalogServiceCatalog extends Catalog { */ private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion) throws DatabaseNotFoundException { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Db db = getDb(updatedTbl.getDb().getName()); if (db == null) { @@ -868,7 +1268,7 @@ public class CatalogServiceCatalog extends Catalog { db.addTable(updatedTbl); return updatedTbl; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -879,12 +1279,17 @@ public class CatalogServiceCatalog extends Catalog { public Table removeTable(String dbName, String tblName) { Db parentDb = getDb(dbName); if (parentDb == null) return null; - - Table removedTable = parentDb.removeTable(tblName); - if (removedTable != null) { - removedTable.setCatalogVersion(incrementAndGetCatalogVersion()); + versionLock_.writeLock().lock(); + try { + Table removedTable = parentDb.removeTable(tblName); + if (removedTable != null) { + removedTable.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject()); + } + return removedTable; + } finally { + versionLock_.writeLock().unlock(); } - return removedTable; } /** @@ -894,11 +1299,17 @@ public class CatalogServiceCatalog extends Catalog { */ @Override public Function removeFunction(Function desc) { - Function removedFn = super.removeFunction(desc); - if (removedFn != null) { - removedFn.setCatalogVersion(incrementAndGetCatalogVersion()); + versionLock_.writeLock().lock(); + try { + Function removedFn = super.removeFunction(desc); + if (removedFn != null) { + removedFn.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(removedFn.toTCatalogObject()); + } + return removedFn; + } finally { + versionLock_.writeLock().unlock(); } - return removedFn; } /** @@ -909,9 +1320,14 @@ public class CatalogServiceCatalog extends Catalog { public boolean addFunction(Function fn) { Db db = getDb(fn.getFunctionName().getDb()); if (db == null) return false; - if (db.addFunction(fn)) { - fn.setCatalogVersion(incrementAndGetCatalogVersion()); - return true; + versionLock_.writeLock().lock(); + try { + if (db.addFunction(fn)) { + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + return true; + } + } finally { + versionLock_.writeLock().unlock(); } return false; } @@ -922,20 +1338,31 @@ public class CatalogServiceCatalog extends Catalog { */ @Override public boolean addDataSource(DataSource dataSource) { - if (dataSources_.add(dataSource)) { - dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); - return true; + versionLock_.writeLock().lock(); + try { + if (dataSources_.add(dataSource)) { + dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); + return true; + } + } finally { + versionLock_.writeLock().unlock(); } return false; } @Override public DataSource removeDataSource(String dataSourceName) { - DataSource dataSource = dataSources_.remove(dataSourceName); - if (dataSource != null) { - dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); + versionLock_.writeLock().lock(); + try { + DataSource dataSource = dataSources_.remove(dataSourceName); + if (dataSource != null) { + dataSource.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(dataSource.toTCatalogObject()); + } + return dataSource; + } finally { + versionLock_.writeLock().unlock(); } - return dataSource; } /** @@ -969,20 +1396,30 @@ public class CatalogServiceCatalog extends Catalog { /** * Renames a table. Equivalent to an atomic drop + add of the table. Returns - * the new Table object with an incremented catalog version or null if the - * drop or add were unsuccessful. If null is returned, then the catalog cache - * is in one of the following two states: - * 1. Old table was not removed, and new table was not added - * 2. Old table was removed, but new table was not added + * a pair of tables containing the removed table (or null if the table drop was not + * successful) and the new table (or null if either the drop of the old one or the + * add of the new table was not successful). Depending on the return value, the catalog + * cache is in one of the following states: + * 1. null, null: Old table was not removed and new table was not added. + * 2. null, T_new: Invalid configuration + * 3. T_old, null: Old table was removed but new table was not added. + * 4. T_old, T_new: Old table was removed and new table was added. */ - public Table renameTable(TTableName oldTableName, TTableName newTableName) + public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName newTableName) throws CatalogException { // Remove the old table name from the cache and add the new table. Db db = getDb(oldTableName.getDb_name()); if (db == null) return null; - Table oldTable = db.removeTable(oldTableName.getTable_name()); - if (oldTable == null) return null; - return addTable(newTableName.getDb_name(), newTableName.getTable_name()); + versionLock_.writeLock().lock(); + try { + Table oldTable = + removeTable(oldTableName.getDb_name(), oldTableName.getTable_name()); + if (oldTable == null) return Pair.create(null, null); + return Pair.create(oldTable, + addTable(newTableName.getDb_name(), newTableName.getTable_name())); + } finally { + versionLock_.writeLock().unlock(); + } } /** @@ -1004,7 +1441,7 @@ public class CatalogServiceCatalog extends Catalog { } try { long newCatalogVersion = incrementAndGetCatalogVersion(); - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); try (MetaStoreClient msClient = getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = null; try { @@ -1019,7 +1456,7 @@ public class CatalogServiceCatalog extends Catalog { LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName())); return tbl.toTCatalogObject(); } finally { - Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread()); + Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread()); tbl.getLock().unlock(); } } @@ -1123,9 +1560,7 @@ public class CatalogServiceCatalog extends Catalog { try { msDb = msClient.getHiveClient().getDatabase(dbName); Preconditions.checkNotNull(msDb); - db = new Db(dbName, this, msDb); - db.setCatalogVersion(incrementAndGetCatalogVersion()); - addDb(db); + addDb(dbName, msDb); dbWasAdded.setRef(true); } catch (TException e) { // The Metastore database cannot be get. Log the error and return. @@ -1138,9 +1573,8 @@ public class CatalogServiceCatalog extends Catalog { // Add a new uninitialized table to the table cache, effectively invalidating // any existing entry. The metadata for the table will be loaded lazily, on the // on the next access to the table. - Table newTable = IncompleteTable.createUninitializedTable(db, tblName); - newTable.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(newTable); + Table newTable = addTable(dbName, tblName); + Preconditions.checkNotNull(newTable); if (loadInBackground_) { tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(), tblName.toLowerCase())); @@ -1148,7 +1582,10 @@ public class CatalogServiceCatalog extends Catalog { if (dbWasAdded.getRef()) { // The database should always have a lower catalog version than the table because // it needs to be created before the table can be added. - Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion()); + Db addedDb = newTable.getDb(); + Preconditions.checkNotNull(addedDb); + Preconditions.checkState( + addedDb.getCatalogVersion() < newTable.getCatalogVersion()); } return newTable.toTCatalogObject(); } @@ -1158,14 +1595,14 @@ public class CatalogServiceCatalog extends Catalog { * If a role with the same name already exists it will be overwritten. */ public Role addRole(String roleName, Set<String> grantGroups) { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = new Role(roleName, grantGroups); role.setCatalogVersion(incrementAndGetCatalogVersion()); authPolicy_.addRole(role); return role; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1175,14 +1612,19 @@ public class CatalogServiceCatalog extends Catalog { * exists. */ public Role removeRole(String roleName) { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = authPolicy_.removeRole(roleName); if (role == null) return null; + for (RolePrivilege priv: role.getPrivileges()) { + priv.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(priv.toTCatalogObject()); + } role.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(role.toTCatalogObject()); return role; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1192,14 +1634,14 @@ public class CatalogServiceCatalog extends Catalog { */ public Role addRoleGrantGroup(String roleName, String groupName) throws CatalogException { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = authPolicy_.addGrantGroup(roleName, groupName); Preconditions.checkNotNull(role); role.setCatalogVersion(incrementAndGetCatalogVersion()); return role; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1209,14 +1651,14 @@ public class CatalogServiceCatalog extends Catalog { */ public Role removeRoleGrantGroup(String roleName, String groupName) throws CatalogException { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = authPolicy_.removeGrantGroup(roleName, groupName); Preconditions.checkNotNull(role); role.setCatalogVersion(incrementAndGetCatalogVersion()); return role; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1227,7 +1669,7 @@ public class CatalogServiceCatalog extends Catalog { */ public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv) throws CatalogException { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = authPolicy_.getRole(roleName); if (role == null) throw new CatalogException("Role does not exist: " + roleName); @@ -1236,7 +1678,7 @@ public class CatalogServiceCatalog extends Catalog { authPolicy_.addPrivilege(priv); return priv; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1247,7 +1689,7 @@ public class CatalogServiceCatalog extends Catalog { */ public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv) throws CatalogException { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { Role role = authPolicy_.getRole(roleName); if (role == null) throw new CatalogException("Role does not exist: " + roleName); @@ -1255,9 +1697,10 @@ public class CatalogServiceCatalog extends Catalog { role.removePrivilege(thriftPriv.getPrivilege_name()); if (rolePrivilege == null) return null; rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion()); + deleteLog_.addRemovedObject(rolePrivilege.toTCatalogObject()); return rolePrivilege; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1268,13 +1711,13 @@ public class CatalogServiceCatalog extends Catalog { */ public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec) throws CatalogException { - catalogLock_.readLock().lock(); + versionLock_.readLock().lock(); try { Role role = authPolicy_.getRole(roleName); if (role == null) throw new CatalogException("Role does not exist: " + roleName); return role.getPrivilege(privSpec.getPrivilege_name()); } finally { - catalogLock_.readLock().unlock(); + versionLock_.readLock().unlock(); } } @@ -1282,11 +1725,11 @@ public class CatalogServiceCatalog extends Catalog { * Increments the current Catalog version and returns the new value. */ public long incrementAndGetCatalogVersion() { - catalogLock_.writeLock().lock(); + versionLock_.writeLock().lock(); try { return ++catalogVersion_; } finally { - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); } } @@ -1294,16 +1737,15 @@ public class CatalogServiceCatalog extends Catalog { * Returns the current Catalog version. */ public long getCatalogVersion() { - catalogLock_.readLock().lock(); + versionLock_.readLock().lock(); try { return catalogVersion_; } finally { - catalogLock_.readLock().unlock(); + versionLock_.readLock().unlock(); } } - public ReentrantReadWriteLock getLock() { return catalogLock_; } - + public ReentrantReadWriteLock getLock() { return versionLock_; } public SentryProxy getSentryProxy() { return sentryProxy_; } public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } @@ -1320,7 +1762,7 @@ public class CatalogServiceCatalog extends Catalog { } try { long newCatalogVersion = incrementAndGetCatalogVersion(); - catalogLock_.writeLock().unlock(); + versionLock_.writeLock().unlock(); HdfsTable hdfsTable = (HdfsTable) tbl; HdfsPartition hdfsPartition = hdfsTable .getPartitionFromThriftPartitionSpec(partitionSpec); @@ -1355,8 +1797,111 @@ public class CatalogServiceCatalog extends Catalog { hdfsTable.getFullName(), partitionName)); return hdfsTable.toTCatalogObject(); } finally { - Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread()); + Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread()); tbl.getLock().unlock(); } } + + public CatalogDeltaLog getDeleteLog() { return deleteLog_; } + + /** + * Returns the version of the topic update that an operation using SYNC_DDL must wait + * for in order to ensure that its result set ('result') has been broadcast to all the + * coordinators. For operations that don't produce a result set, e.g. INVALIDATE + * METADATA, return the version specified in 'result.version'. + */ + public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException { + if (!result.isSetUpdated_catalog_objects() && + !result.isSetRemoved_catalog_objects()) { + return result.getVersion(); + } + long lastSentTopicUpdate = lastSentTopicUpdate_.get(); + // Maximum number of attempts (topic updates) to find the catalog topic version that + // an operation using SYNC_DDL must wait for. + long maxNumAttempts = 5; + if (result.isSetUpdated_catalog_objects()) { + maxNumAttempts = + result.getUpdated_catalog_objects().size() * (MAX_NUM_SKIPPED_TOPIC_UPDATES + 1); + } + long numAttempts = 0; + long begin = System.currentTimeMillis(); + long versionToWaitFor = -1; + while (versionToWaitFor == -1) { + if (LOG.isTraceEnabled()) { + LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts); + } + // Examine the topic update log to determine the latest topic update that + // covers the added/modified/deleted objects in 'result'. + long topicVersionForUpdates = + getCoveringTopicUpdateVersion(result.getUpdated_catalog_objects()); + long topicVersionForDeletes = + getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects()); + if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) { + // Wait for the next topic update. + synchronized(topicUpdateLog_) { + try { + topicUpdateLog_.wait(TOPIC_UPDATE_WAIT_TIMEOUT_MS); + } catch (InterruptedException e) { + // Ignore + } + } + long currentTopicUpdate = lastSentTopicUpdate_.get(); + // Don't count time-based exits from the wait() toward the maxNumAttempts + // threshold. + if (lastSentTopicUpdate != currentTopicUpdate) { + ++numAttempts; + if (numAttempts > maxNumAttempts) { + throw new CatalogException("Couldn't retrieve the catalog topic version " + + "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." + + "The operation has been successfully executed but its effects may have " + + "not been broadcast to all the coordinators."); + } + lastSentTopicUpdate = currentTopicUpdate; + } + } else { + versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates); + } + } + Preconditions.checkState(versionToWaitFor >= 0); + LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " + + versionToWaitFor + ". Time to identify topic version (msec): " + + (System.currentTimeMillis() - begin)); + return versionToWaitFor; + } + + /** + * Returns the version of the topic update that covers a set of TCatalogObjects. + * A topic update U covers a TCatalogObject T, corresponding to a catalog object O, + * if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >= + * last_topic_update(O). The first condition indicates that a version of O that is + * larger or equal to the version in T has been added to a topic update. The second + * condition indicates that U is either the update to include O or an update following + * the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects' + * which doesn't satisfy the above conditions. + */ + private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) { + if (tCatalogObjects == null || tCatalogObjects.isEmpty()) { + return lastSentTopicUpdate_.get(); + } + long versionToWaitFor = -1; + for (TCatalogObject tCatalogObject: tCatalogObjects) { + TopicUpdateLog.Entry topicUpdateEntry = + topicUpdateLog_.get(Catalog.toCatalogObjectKey(tCatalogObject)); + // There are two reasons for which a topic update log entry cannot be found: + // a) It corresponds to a new catalog object that hasn't been processed by a catalog + // update yet. + // b) It corresponds to a catalog object that hasn't been modified for at least + // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage + // collected. + // In both cases, -1 is returned to indicate that we're waiting for the + // entry to show up in the topic update log. + if (topicUpdateEntry == null || + topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) { + return -1; + } + versionToWaitFor = + Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate()); + } + return versionToWaitFor; + } } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/DataSource.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSource.java b/fe/src/main/java/org/apache/impala/catalog/DataSource.java index e9601d7..f59f3be 100644 --- a/fe/src/main/java/org/apache/impala/catalog/DataSource.java +++ b/fe/src/main/java/org/apache/impala/catalog/DataSource.java @@ -19,6 +19,7 @@ package org.apache.impala.catalog; import org.apache.hadoop.fs.Path; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TDataSource; import com.google.common.base.Objects; @@ -27,13 +28,12 @@ import com.google.common.base.Objects; * Represents a data source in the catalog. Contains the data source name and all * information needed to locate and load the data source. */ -public class DataSource implements CatalogObject { +public class DataSource extends CatalogObjectImpl { private final String dataSrcName_; private final String className_; private final String apiVersionString_; // Qualified path to the data source. private final String location_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; public DataSource(String dataSrcName, String location, String className, String apiVersionString) { @@ -54,16 +54,9 @@ public class DataSource implements CatalogObject { } @Override - public long getCatalogVersion() { return catalogVersion_; } - - @Override - public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } - - @Override public String getName() { return dataSrcName_; } - @Override - public boolean isLoaded() { return true; } + public String getUniqueName() { return "DATA_SOURCE:" + dataSrcName_.toLowerCase(); } public String getLocation() { return location_; } public String getClassName() { return className_; } @@ -85,4 +78,11 @@ public class DataSource implements CatalogObject { public static String debugString(TDataSource thrift) { return fromThrift(thrift).debugString(); } + + public TCatalogObject toTCatalogObject() { + TCatalogObject catalogObj = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); + catalogObj.setData_source(toThrift()); + return catalogObj; + } } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Db.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index 074ff92..f1c9c8e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -34,6 +34,7 @@ import org.apache.impala.catalog.Function; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.JniUtil; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TFunction; @@ -59,11 +60,10 @@ import com.google.common.collect.Maps; * value is the base64 representation of the thrift serialized function object. * */ -public class Db implements CatalogObject { +public class Db extends CatalogObjectImpl { private static final Logger LOG = LoggerFactory.getLogger(Db.class); private final Catalog parentCatalog_; private final TDatabase thriftDb_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_"; @@ -134,16 +134,14 @@ public class Db implements CatalogObject { @Override public String getName() { return thriftDb_.getDb_name(); } @Override - public TCatalogObjectType getCatalogObjectType() { - return TCatalogObjectType.DATABASE; - } + public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; } + @Override + public String getUniqueName() { return "DATABASE:" + getName().toLowerCase(); } /** * Adds a table to the table cache. */ - public void addTable(Table table) { - tableCache_.add(table); - } + public void addTable(Table table) { tableCache_.add(table); } /** * Gets all table names in the table cache. @@ -165,9 +163,7 @@ public class Db implements CatalogObject { * Returns the Table with the given name if present in the table cache or null if the * table does not exist in the cache. */ - public Table getTable(String tblName) { - return tableCache_.get(tblName); - } + public Table getTable(String tblName) { return tableCache_.get(tblName); } /** * Removes the table name and any cached metadata from the Table cache. @@ -495,11 +491,10 @@ public class Db implements CatalogObject { return result; } - @Override - public long getCatalogVersion() { return catalogVersion_; } - @Override - public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } - - @Override - public boolean isLoaded() { return true; } + public TCatalogObject toTCatalogObject() { + TCatalogObject catalogObj = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); + catalogObj.setDb(toThrift()); + return catalogObj; + } } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Function.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java index 80316a6..03cd867 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Function.java +++ b/fe/src/main/java/org/apache/impala/catalog/Function.java @@ -49,7 +49,7 @@ import com.google.common.collect.Lists; * - Builtin functions, which are recreated after every restart of the * catalog. (persisted, visible to Impala) */ -public class Function implements CatalogObject { +public class Function extends CatalogObjectImpl { // Enum for how to compare function signatures. // For decimal types, the type in the function can be a wildcard, i.e. decimal(*,*). // The wildcard can *only* exist as function type, the caller will always be a @@ -106,7 +106,6 @@ public class Function implements CatalogObject { // Set to true for functions that survive service restarts, including all builtins, // native and IR functions, but only Java functions created without a signature. private boolean isPersistent_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; public Function(FunctionName name, Type[] argTypes, Type retType, boolean varArgs) { @@ -298,15 +297,12 @@ public class Function implements CatalogObject { @Override public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.FUNCTION; } - - @Override - public long getCatalogVersion() { return catalogVersion_; } - - @Override - public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } - @Override public String getName() { return getFunctionName().toString(); } + @Override + public String getUniqueName() { + return "FUNCTION:" + name_.toString() + "(" + signatureString() + ")"; + } // Child classes must override this function. public String toSql(boolean ifNotExists) { return ""; } @@ -315,7 +311,7 @@ public class Function implements CatalogObject { TCatalogObject result = new TCatalogObject(); result.setType(TCatalogObjectType.FUNCTION); result.setFn(toThrift()); - result.setCatalog_version(catalogVersion_); + result.setCatalog_version(getCatalogVersion()); return result; } @@ -372,9 +368,6 @@ public class Function implements CatalogObject { return function; } - @Override - public boolean isLoaded() { return true; } - // Returns the resolved symbol in the binary. The BE will do a lookup of 'symbol' // in the binary and try to resolve unmangled names. // If this function is expecting a return argument, retArgType is that type. It should http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java index 398bc87..6f752d4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java @@ -28,8 +28,7 @@ import com.google.common.base.Preconditions; * care about for cache pools is the cache pool name. In the future it may be desirable * to track additional metadata such as the owner, size, and current usage of the pool. */ -public class HdfsCachePool implements CatalogObject { - private long catalogVersion_; +public class HdfsCachePool extends CatalogObjectImpl { private final THdfsCachePool cachePool_; public HdfsCachePool(CachePoolInfo cachePoolInfo) { @@ -57,9 +56,5 @@ public class HdfsCachePool implements CatalogObject { @Override public String getName() { return cachePool_.getPool_name(); } @Override - public long getCatalogVersion() { return catalogVersion_; } - @Override - public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } - @Override - public boolean isLoaded() { return true; } -} \ No newline at end of file + public String getUniqueName() { return "HDFS_CACHE_POOL:" + getName().toLowerCase(); } +}
