http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 4c959b2..0e2e8b9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -20,6 +20,7 @@ package org.apache.impala.catalog; import com.google.common.base.Preconditions; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -29,6 +30,7 @@ import org.apache.thrift.TException; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.ImpalaException; +import org.apache.impala.util.PatternMatcher; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TDataSource; @@ -173,23 +175,31 @@ public class ImpaladCatalog extends Catalog { } } - // Now remove all objects from the catalog. Removing a database before removing - // its child tables/functions is fine. If that happens, the removal of the child - // object will be a no-op. + // Now remove all objects from the catalog. First remove low-level objects (tables, + // functions and privileges) and then the top-level objects (databases and roles). for (TCatalogObject catalogObject: req.getRemoved_objects()) { - removeCatalogObject(catalogObject, newCatalogVersion); + if (!isTopLevelCatalogObject(catalogObject)) { + removeCatalogObject(catalogObject); + } } + for (TCatalogObject catalogObject: req.getRemoved_objects()) { + if (isTopLevelCatalogObject(catalogObject)) { + removeCatalogObject(catalogObject); + } + } + + lastSyncedCatalogVersion_ = newCatalogVersion; // Cleanup old entries in the log. catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_); isReady_.set(true); - // Notify all the threads waiting on a catalog update. synchronized (catalogUpdateEventNotifier_) { catalogUpdateEventNotifier_.notifyAll(); } - return new TUpdateCatalogCacheResponse(catalogServiceId_); + return new TUpdateCatalogCacheResponse(catalogServiceId_, + CatalogObjectVersionQueue.INSTANCE.getMinimumVersion()); } /** @@ -319,24 +329,10 @@ public class ImpaladCatalog extends Catalog { /** * Removes the matching TCatalogObject from the catalog, if one exists and its * catalog version is < the catalog version of this drop operation. - * Note that drop operations that come from statestore heartbeats always have a - * version of 0. To determine the drop version for statestore updates, - * the catalog version from the current update is used. This is okay because there - * can never be a catalog update from the statestore that contains a drop - * and an addition of the same object. For more details on how drop - * versioning works, see CatalogServerCatalog.java */ - private void removeCatalogObject(TCatalogObject catalogObject, - long currentCatalogUpdateVersion) { - // The TCatalogObject associated with a drop operation from a state store - // heartbeat will always have a version of zero. Because no update from - // the state store can contain both a drop and an addition of the same object, - // we can assume the drop version is the current catalog version of this update. - // If the TCatalogObject contains a version that != 0, it indicates the drop - // came from a direct update. - long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ? - currentCatalogUpdateVersion : catalogObject.getCatalog_version(); - + private void removeCatalogObject(TCatalogObject catalogObject) { + Preconditions.checkState(catalogObject.getCatalog_version() != 0); + long dropCatalogVersion = catalogObject.getCatalog_version(); switch(catalogObject.getType()) { case DATABASE: removeDb(catalogObject.getDb(), dropCatalogVersion); @@ -360,7 +356,7 @@ public class ImpaladCatalog extends Catalog { case HDFS_CACHE_POOL: HdfsCachePool existingItem = hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name()); - if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) { + if (existingItem.getCatalogVersion() <= catalogObject.getCatalog_version()) { hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name()); } break; @@ -381,6 +377,15 @@ public class ImpaladCatalog extends Catalog { Db newDb = Db.fromTDatabase(thriftDb, this); newDb.setCatalogVersion(catalogVersion); addDb(newDb); + if (existingDb != null) { + CatalogObjectVersionQueue.INSTANCE.updateVersions( + existingDb.getCatalogVersion(), catalogVersion); + CatalogObjectVersionQueue.INSTANCE.removeAll(existingDb.getTables()); + CatalogObjectVersionQueue.INSTANCE.removeAll( + existingDb.getFunctions(null, new PatternMatcher())); + } else { + CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion); + } } } @@ -414,6 +419,12 @@ public class ImpaladCatalog extends Catalog { if (existingFn == null || existingFn.getCatalogVersion() < catalogVersion) { db.addFunction(function); + if (existingFn != null) { + CatalogObjectVersionQueue.INSTANCE.updateVersions( + existingFn.getCatalogVersion(), catalogVersion); + } else { + CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion); + } } } @@ -431,6 +442,11 @@ public class ImpaladCatalog extends Catalog { Db db = getDb(thriftDb.getDb_name()); if (db != null && db.getCatalogVersion() < dropCatalogVersion) { removeDb(db.getName()); + CatalogObjectVersionQueue.INSTANCE.removeVersion( + db.getCatalogVersion()); + CatalogObjectVersionQueue.INSTANCE.removeAll(db.getTables()); + CatalogObjectVersionQueue.INSTANCE.removeAll( + db.getFunctions(null, new PatternMatcher())); } } @@ -455,6 +471,8 @@ public class ImpaladCatalog extends Catalog { Function fn = db.getFunction(thriftFn.getSignature()); if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) { db.removeFunction(thriftFn.getSignature()); + CatalogObjectVersionQueue.INSTANCE.removeVersion( + fn.getCatalogVersion()); } } @@ -463,6 +481,7 @@ public class ImpaladCatalog extends Catalog { // version of the drop, remove the function. if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) { authPolicy_.removeRole(thriftRole.getRole_name()); + CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges()); } }
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Role.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Role.java b/fe/src/main/java/org/apache/impala/catalog/Role.java index 0b89866..b45ff22 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Role.java +++ b/fe/src/main/java/org/apache/impala/catalog/Role.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TRole; import com.google.common.base.Preconditions; @@ -30,11 +31,10 @@ import com.google.common.collect.Sets; /** * Represents a role in an authorization policy. This class is thread safe. */ -public class Role implements CatalogObject { +public class Role extends CatalogObjectImpl { private final TRole role_; // The last role ID assigned, starts at 0. private static AtomicInteger roleId_ = new AtomicInteger(0); - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; private final CatalogObjectCache<RolePrivilege> rolePrivileges_ = new CatalogObjectCache<RolePrivilege>(); @@ -134,11 +134,12 @@ public class Role implements CatalogObject { public String getName() { return role_.getRole_name(); } public int getId() { return role_.getRole_id(); } @Override - public synchronized long getCatalogVersion() { return catalogVersion_; } - @Override - public synchronized void setCatalogVersion(long newVersion) { - catalogVersion_ = newVersion; + public String getUniqueName() { return "ROLE:" + getName().toLowerCase(); } + + public TCatalogObject toTCatalogObject() { + TCatalogObject catalogObject = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); + catalogObject.setRole(toThrift()); + return catalogObject; } - @Override - public boolean isLoaded() { return true; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java index 87277af..ef3717c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java +++ b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.log4j.Logger; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TPrivilegeLevel; @@ -33,16 +34,14 @@ import com.google.common.collect.Lists; * Represents a privilege that has been granted to a role in an authorization policy. * This class is thread safe. */ -public class RolePrivilege implements CatalogObject { +public class RolePrivilege extends CatalogObjectImpl { private static final Logger LOG = Logger.getLogger(AuthorizationPolicy.class); // These Joiners are used to build role names. For simplicity, the role name we // use can also be sent to the Sentry library to perform authorization checks // so we build them in the same format. private static final Joiner AUTHORIZABLE_JOINER = Joiner.on("->"); private static final Joiner KV_JOINER = Joiner.on("="); - private final TPrivilege privilege_; - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; private RolePrivilege(TPrivilege privilege) { privilege_ = privilege; @@ -132,13 +131,16 @@ public class RolePrivilege implements CatalogObject { public String getName() { return privilege_.getPrivilege_name(); } public int getRoleId() { return privilege_.getRole_id(); } @Override - public synchronized long getCatalogVersion() { return catalogVersion_; } - @Override - public synchronized void setCatalogVersion(long newVersion) { - catalogVersion_ = newVersion; + public String getUniqueName() { + return "PRIVILEGE:" + getName().toLowerCase() + "." + Integer.toString(getRoleId()); + } + + public TCatalogObject toTCatalogObject() { + TCatalogObject catalogObject = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); + catalogObject.setPrivilege(toThrift()); + return catalogObject; } - @Override - public boolean isLoaded() { return true; } // The time this role was created. Used to quickly check if the same privilege // was dropped and re-created. Assumes a role will not be created + dropped + created http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 23fa7a4..50fe953 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -56,13 +56,9 @@ import com.google.common.collect.Maps; * is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions * a key range into a fixed number of buckets). */ -public abstract class Table implements CatalogObject { +public abstract class Table extends CatalogObjectImpl { private static final Logger LOG = Logger.getLogger(Table.class); - - // Catalog version assigned to this table - private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; protected org.apache.hadoop.hive.metastore.api.Table msTable_; - protected final Db db_; protected final String name_; protected final String owner_; @@ -358,13 +354,21 @@ public abstract class Table implements CatalogObject { } public TCatalogObject toTCatalogObject() { - TCatalogObject catalogObject = new TCatalogObject(); - catalogObject.setType(getCatalogObjectType()); - catalogObject.setCatalog_version(getCatalogVersion()); + TCatalogObject catalogObject = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); catalogObject.setTable(toThrift()); return catalogObject; } + public TCatalogObject toMinimalTCatalogObject() { + TCatalogObject catalogObject = + new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); + catalogObject.setTable(new TTable()); + catalogObject.getTable().setDb_name(getDb().getName()); + catalogObject.getTable().setTbl_name(getName()); + return catalogObject; + } + /** * Gets the ColumnType from the given FieldSchema by using Impala's SqlParser. * Throws a TableLoadingException if the FieldSchema could not be parsed. @@ -396,6 +400,8 @@ public abstract class Table implements CatalogObject { public TableName getTableName() { return new TableName(db_ != null ? db_.getName() : null, name_); } + @Override + public String getUniqueName() { return "TABLE:" + getFullName(); } public ArrayList<Column> getColumns() { return colsByPos_; } @@ -490,17 +496,6 @@ public abstract class Table implements CatalogObject { public TTableStats getTTableStats() { return tableStats_; } public ArrayType getType() { return type_; } - @Override - public long getCatalogVersion() { return catalogVersion_; } - - @Override - public void setCatalogVersion(long catalogVersion) { - catalogVersion_ = catalogVersion; - } - - @Override - public boolean isLoaded() { return true; } - public static boolean isExternalTable( org.apache.hadoop.hive.metastore.api.Table msTbl) { return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()); http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java new file mode 100644 index 0000000..9d23c4f --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; + +import org.apache.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +// A log of topic update information for each catalog object. An entry is added to +// the log when a catalog object is processed (added/removed/skipped) in a topic +// update and it is replaced every time the catalog object is processed in a +// topic update. +// +// To prevent the log from growing indefinitely, the oldest entries +// (in terms of last topic update that processed the associated catalog objects) are +// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause +// entries of deleted catalog objects or entries of objects that haven't been processed +// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from +// the log. +public class TopicUpdateLog { + private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class); + // Frequency at which the entries of the topic update log are garbage collected. + // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates. + private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000; + + // Number of topic updates left to trigger a gc of topic update log entries. + private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY; + + // In the next gc cycle of topic update log entries, all the entries that were last + // added to a topic update with version less than or equal to + // 'oldestTopicUpdateToGc_' are removed from the update log. + private long oldestTopicUpdateToGc_ = -1; + + // Represents an entry in the topic update log. A topic update log entry is + // associated with a catalog object and stores information about the last topic update + // that processed that object. + public static class Entry { + // Number of times the entry has skipped a topic update. + private final int numSkippedUpdates_; + // Last version of the corresponding catalog object that was added to a topic + // update. -1 if the object was never added to a topic update. + private final long lastSentVersion_; + // Version of the last topic update to include the corresponding catalog object. + // -1 if the object was never added to a topic update. + private final long lastSentTopicUpdate_; + + Entry() { + numSkippedUpdates_ = 0; + lastSentVersion_ = -1; + lastSentTopicUpdate_ = -1; + } + + Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate) { + numSkippedUpdates_ = numSkippedUpdates; + lastSentVersion_ = lastSentVersion; + lastSentTopicUpdate_ = lastSentCatalogUpdate; + } + + public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; } + public long getLastSentVersion() { return lastSentVersion_; } + public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; } + + @Override + public boolean equals(Object other) { + if (this.getClass() != other.getClass()) return false; + Entry entry = (Entry) other; + return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates() + && lastSentVersion_ == entry.getLastSentVersion() + && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate(); + } + } + + // Entries in the topic update log stored as a map of catalog object keys to + // Entry objects. + private final ConcurrentHashMap<String, Entry> topicLogEntries_ = + new ConcurrentHashMap<>(); + + /** + * Garbage-collects topic update log entries. These are entries that haven't been + * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. + */ + public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) { + if (oldestTopicUpdateToGc_ == -1) { + oldestTopicUpdateToGc_ = lastTopicUpdateVersion; + return; + } + if (numTopicUpdatesToGc_ == 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Topic update log GC started."); + } + Preconditions.checkState(oldestTopicUpdateToGc_ > 0); + int numEntriesRemoved = 0; + for (Map.Entry<String, Entry> entry: + topicLogEntries_.entrySet()) { + if (entry.getValue().getLastSentVersion() == -1) continue; + if (entry.getValue().getLastSentCatalogUpdate() <= oldestTopicUpdateToGc_) { + if (topicLogEntries_.remove(entry.getKey(), entry.getValue())) { + ++numEntriesRemoved; + } + } + } + numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY; + oldestTopicUpdateToGc_ = lastTopicUpdateVersion; + if (LOG.isTraceEnabled()) { + LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved + + " entries."); + } + } else { + --numTopicUpdatesToGc_; + } + } + + public void add(String catalogObjectKey, Entry logEntry) { + Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey)); + Preconditions.checkNotNull(logEntry); + topicLogEntries_.put(catalogObjectKey, logEntry); + } + + public Entry get(String catalogObjectKey) { + Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey)); + return topicLogEntries_.get(catalogObjectKey); + } + + // Returns the topic update log entry for the catalog object with key + // 'catalogObjectKey'. If the key does not exist, a newly constructed log entry is + // returned. + public Entry getOrCreateLogEntry(String catalogObjectKey) { + Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey)); + Entry entry = topicLogEntries_.get(catalogObjectKey); + if (entry == null) entry = new Entry(); + return entry; + } +} + http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index b0ed45f..295956c 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -179,7 +179,7 @@ import com.google.common.math.LongMath; * update operations and requires the use of fair table locks to prevent starvation. * * DO { - * Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_) + * Acquire the catalog lock (see CatalogServiceCatalog.versionLock_) * Try to acquire a table lock * IF the table lock acquisition fails { * Release the catalog lock @@ -326,6 +326,15 @@ public class CatalogOpExecutor { ddlRequest.ddl_type); } + // If SYNC_DDL is set, set the catalog update that contains the results of this DDL + // operation. The version of this catalog update is returned to the requesting + // impalad which will wait until this catalog update has been broadcast to all the + // coordinators. + if (ddlRequest.isSync_ddl()) { + response.getResult().setVersion( + catalog_.waitForSyncDdlVersion(response.getResult())); + } + // At this point, the operation is considered successful. If any errors occurred // during execution, this function will throw an exception and the CatalogServer // will handle setting a bad status code. @@ -909,12 +918,15 @@ public class CatalogOpExecutor { String dbName = params.getDb(); Preconditions.checkState(dbName != null && !dbName.isEmpty(), "Null or empty database name passed as argument to Catalog.createDatabase"); - if (params.if_not_exists && catalog_.getDb(dbName) != null) { + Db existingDb = catalog_.getDb(dbName); + if (params.if_not_exists && existingDb != null) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping database creation because " + dbName + " already exists " + "and IF NOT EXISTS was specified."); } - resp.getResult().setVersion(catalog_.getCatalogVersion()); + Preconditions.checkNotNull(existingDb); + resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject()); + resp.getResult().setVersion(existingDb.getCatalogVersion()); return; } org.apache.hadoop.hive.metastore.api.Database db = @@ -960,11 +972,7 @@ public class CatalogOpExecutor { } Preconditions.checkNotNull(newDb); - TCatalogObject thriftDb = new TCatalogObject( - TCatalogObjectType.DATABASE, Catalog.INITIAL_CATALOG_VERSION); - thriftDb.setDb(newDb.toThrift()); - thriftDb.setCatalog_version(newDb.getCatalogVersion()); - resp.result.addToUpdated_catalog_objects(thriftDb); + resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject()); } resp.result.setVersion(newDb.getCatalogVersion()); } @@ -1038,22 +1046,18 @@ public class CatalogOpExecutor { throws ImpalaException { if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); } DataSource dataSource = DataSource.fromThrift(params.getData_source()); - if (catalog_.getDataSource(dataSource.getName()) != null) { + DataSource existingDataSource = catalog_.getDataSource(dataSource.getName()); + if (existingDataSource != null) { if (!params.if_not_exists) { throw new ImpalaRuntimeException("Data source " + dataSource.getName() + " already exists."); } - // The user specified IF NOT EXISTS and the data source exists, just - // return the current catalog version. - resp.result.setVersion(catalog_.getCatalogVersion()); + resp.result.addToUpdated_catalog_objects(existingDataSource.toTCatalogObject()); + resp.result.setVersion(existingDataSource.getCatalogVersion()); return; } catalog_.addDataSource(dataSource); - TCatalogObject addedObject = new TCatalogObject(); - addedObject.setType(TCatalogObjectType.DATA_SOURCE); - addedObject.setData_source(dataSource.toThrift()); - addedObject.setCatalog_version(dataSource.getCatalogVersion()); - resp.result.addToUpdated_catalog_objects(addedObject); + resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject()); resp.result.setVersion(dataSource.getCatalogVersion()); } @@ -1070,11 +1074,7 @@ public class CatalogOpExecutor { resp.result.setVersion(catalog_.getCatalogVersion()); return; } - TCatalogObject removedObject = new TCatalogObject(); - removedObject.setType(TCatalogObjectType.DATA_SOURCE); - removedObject.setData_source(dataSource.toThrift()); - removedObject.setCatalog_version(dataSource.getCatalogVersion()); - resp.result.addToRemoved_catalog_objects(removedObject); + resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject()); resp.result.setVersion(dataSource.getCatalogVersion()); } @@ -1229,7 +1229,7 @@ public class CatalogOpExecutor { throw new CatalogException("Database " + db.getName() + " is not empty"); } - TCatalogObject removedObject = new TCatalogObject(); + TCatalogObject removedObject = null; synchronized (metastoreDdlLock_) { // Remove all the Kudu tables of 'db' from the Kudu storage engine. if (db != null && params.cascade) dropTablesFromKudu(db); @@ -1251,11 +1251,9 @@ public class CatalogOpExecutor { for (String tableName: removedDb.getAllTableNames()) { uncacheTable(removedDb.getTable(tableName)); } - removedObject.setCatalog_version(removedDb.getCatalogVersion()); + removedObject = removedDb.toTCatalogObject(); } - removedObject.setType(TCatalogObjectType.DATABASE); - removedObject.setDb(new TDatabase()); - removedObject.getDb().setDb_name(params.getDb()); + Preconditions.checkNotNull(removedObject); resp.result.setVersion(removedObject.getCatalog_version()); resp.result.addToRemoved_catalog_objects(removedObject); } @@ -1525,12 +1523,17 @@ public class CatalogOpExecutor { Preconditions.checkState(params.getColumns() != null, "Null column list given as argument to Catalog.createTable"); - if (params.if_not_exists && - catalog_.containsTable(tableName.getDb(), tableName.getTbl())) { + Table existingTbl = catalog_.getTable(tableName.getDb(), tableName.getTbl(), false); + if (params.if_not_exists && existingTbl != null) { LOG.trace(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tableName)); - response.getResult().setVersion(catalog_.getCatalogVersion()); - return false; + existingTbl.getLock().lock(); + try { + addTableToCatalogUpdate(existingTbl, response.getResult()); + return false; + } finally { + existingTbl.getLock().unlock(); + } } org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params); LOG.trace(String.format("Creating table %s", tableName)); @@ -1736,12 +1739,17 @@ public class CatalogOpExecutor { Preconditions.checkState(tblName != null && tblName.isFullyQualified()); Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified()); - if (params.if_not_exists && - catalog_.containsTable(tblName.getDb(), tblName.getTbl())) { + Table existingTbl = catalog_.getTable(tblName.getDb(), tblName.getTbl(), false); + if (params.if_not_exists && existingTbl != null) { LOG.trace(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tblName)); - response.getResult().setVersion(catalog_.getCatalogVersion()); - return; + existingTbl.getLock().lock(); + try { + addTableToCatalogUpdate(existingTbl, response.getResult()); + return; + } finally { + existingTbl.getLock().unlock(); + } } Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl()); org.apache.hadoop.hive.metastore.api.Table tbl = @@ -2185,8 +2193,9 @@ public class CatalogOpExecutor { } // Rename the table in the Catalog and get the resulting catalog object. // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP. - Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()); - if (newTable == null) { + Pair<Table, Table> result = + catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()); + if (result.first == null || result.second == null) { // The rename succeeded in the HMS but failed in the catalog cache. The cache is in // an inconsistent state, but can likely be fixed by running "invalidate metadata". throw new ImpalaRuntimeException(String.format( @@ -2196,14 +2205,9 @@ public class CatalogOpExecutor { newTableName.toString())); } - TCatalogObject addedObject = newTable.toTCatalogObject(); - TCatalogObject removedObject = new TCatalogObject(); - removedObject.setType(TCatalogObjectType.TABLE); - removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl())); - removedObject.setCatalog_version(addedObject.getCatalog_version()); - response.result.addToRemoved_catalog_objects(removedObject); - response.result.addToUpdated_catalog_objects(addedObject); - response.result.setVersion(addedObject.getCatalog_version()); + response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject()); + response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject()); + response.result.setVersion(result.second.getCatalogVersion()); } /** @@ -2851,28 +2855,18 @@ public class CatalogOpExecutor { Preconditions.checkNotNull(rolePrivileges); List<TCatalogObject> updatedPrivs = Lists.newArrayList(); for (RolePrivilege rolePriv: rolePrivileges) { - TCatalogObject catalogObject = new TCatalogObject(); - catalogObject.setType(rolePriv.getCatalogObjectType()); - catalogObject.setPrivilege(rolePriv.toThrift()); - catalogObject.setCatalog_version(rolePriv.getCatalogVersion()); - updatedPrivs.add(catalogObject); - } - - // TODO: Currently we only support sending back 1 catalog object in a "direct DDL" - // response. If multiple privileges have been updated, just send back the - // catalog version so subscribers can wait for the statestore heartbeat that - // contains all updates (see IMPALA-5571). - if (updatedPrivs.size() == 1) { + updatedPrivs.add(rolePriv.toTCatalogObject()); + } + + if (!updatedPrivs.isEmpty()) { // If this is a REVOKE statement with hasGrantOpt, only the GRANT OPTION is revoked - // from the privilege. + // from the privileges. Otherwise the privileges are removed from the catalog. if (grantRevokePrivParams.isIs_grant() || privileges.get(0).isHas_grant_opt()) { resp.result.setUpdated_catalog_objects(updatedPrivs); } else { resp.result.setRemoved_catalog_objects(updatedPrivs); } - resp.result.setVersion(updatedPrivs.get(0).getCatalog_version()); - } else if (updatedPrivs.size() > 1) { resp.result.setVersion( updatedPrivs.get(updatedPrivs.size() - 1).getCatalog_version()); } @@ -3027,6 +3021,9 @@ public class CatalogOpExecutor { resp.result.setUpdated_catalog_objects(addedFuncs); resp.result.setRemoved_catalog_objects(removedFuncs); resp.result.setVersion(catalog_.getCatalogVersion()); + for (TCatalogObject removedFn: removedFuncs) { + catalog_.getDeleteLog().addRemovedObject(removedFn); + } } } } else if (req.isSetTable_name()) { @@ -3059,26 +3056,32 @@ public class CatalogOpExecutor { req.getTable_name().getTable_name()); } - if (!dbWasAdded.getRef()) { - // Return the TCatalogObject in the result to indicate this request can be - // processed as a direct DDL operation. - if (tblWasRemoved.getRef()) { - resp.getResult().addToRemoved_catalog_objects(updatedThriftTable); - } else { - resp.getResult().addToUpdated_catalog_objects(updatedThriftTable); - } + // Return the TCatalogObject in the result to indicate this request can be + // processed as a direct DDL operation. + if (tblWasRemoved.getRef()) { + resp.getResult().addToRemoved_catalog_objects(updatedThriftTable); } else { - // Since multiple catalog objects were modified (db and table), don't treat this - // as a direct DDL operation. Set the overall catalog version and the impalad - // will wait for a statestore heartbeat that contains the update. - Preconditions.checkState(!req.isIs_refresh()); + resp.getResult().addToUpdated_catalog_objects(updatedThriftTable); + } + + if (dbWasAdded.getRef()) { + Db addedDb = catalog_.getDb(updatedThriftTable.getTable().getDb_name()); + if (addedDb == null) { + throw new CatalogException("Database " + + updatedThriftTable.getTable().getDb_name() + " was removed by a " + + "concurrent operation. Try invalidating the table again."); + } + resp.getResult().addToUpdated_catalog_objects(addedDb.toTCatalogObject()); } resp.getResult().setVersion(updatedThriftTable.getCatalog_version()); } else { // Invalidate the entire catalog if no table name is provided. Preconditions.checkArgument(!req.isIs_refresh()); - catalog_.reset(); - resp.result.setVersion(catalog_.getCatalogVersion()); + resp.getResult().setVersion(catalog_.reset()); + resp.getResult().setIs_invalidate(true); + } + if (req.isSync_ddl()) { + resp.getResult().setVersion(catalog_.waitForSyncDdlVersion(resp.getResult())); } resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>())); return resp; @@ -3261,11 +3264,16 @@ public class CatalogOpExecutor { loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata); addTableToCatalogUpdate(table, response.result); - return response; } finally { Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); table.getLock().unlock(); } + + if (update.isSync_ddl()) { + response.getResult().setVersion( + catalog_.waitForSyncDdlVersion(response.getResult())); + } + return response; } private List<String> getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index c62fc31..d0936d5 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -520,13 +520,17 @@ public class Frontend { } else { throw new IllegalStateException("Unexpected CatalogOp statement type."); } - result.setResult_set_metadata(metadata); + ddl.setSync_ddl(result.getQuery_options().isSync_ddl()); result.setCatalog_op_request(ddl); if (ddl.getOp_type() == TCatalogOpType.DDL) { TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader(); header.setRequesting_user(analysis.getAnalyzer().getUser().getName()); ddl.getDdl_params().setHeader(header); + ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl()); + } + if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) { + ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl()); } } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index b56527b..e945a3b 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -38,7 +38,8 @@ import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDdlExecRequest; import org.apache.impala.thrift.TFunction; -import org.apache.impala.thrift.TGetAllCatalogObjectsResponse; +import org.apache.impala.thrift.TGetCatalogDeltaResponse; +import org.apache.impala.thrift.TGetCatalogDeltaRequest; import org.apache.impala.thrift.TGetDbsParams; import org.apache.impala.thrift.TGetDbsResult; import org.apache.impala.thrift.TGetFunctionsRequest; @@ -119,9 +120,11 @@ public class JniCatalog { /** * Gets all catalog objects */ - public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException { - TGetAllCatalogObjectsResponse resp = - catalog_.getCatalogObjects(from_version); + public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq) + throws ImpalaException, TException { + TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest(); + JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq); + TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version()); TSerializer serializer = new TSerializer(protocolFactory_); return serializer.serialize(resp); } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/util/SentryProxy.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java index 23534d2..f2df66b 100644 --- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java +++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java @@ -87,7 +87,7 @@ public class SentryProxy { } sentryPolicyService_ = new SentryPolicyService(sentryConfig); - policyReader_.scheduleAtFixedRate(new PolicyReader(), 0, + policyReader_.scheduleAtFixedRate(new PolicyReader(false), 0, BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(), TimeUnit.SECONDS); } @@ -107,6 +107,12 @@ public class SentryProxy { * atomically. */ private class PolicyReader implements Runnable { + private boolean resetVersions_; + + public PolicyReader(boolean resetVersions) { + resetVersions_ = resetVersions; + } + public void run() { synchronized (SentryProxy.this) { // Assume all roles should be removed. Then query the Policy Service and remove @@ -131,6 +137,9 @@ public class SentryProxy { if (existingRole != null && existingRole.getGrantGroups().equals(grantGroups)) { role = existingRole; + if (resetVersions_) { + role.setCatalogVersion(catalog_.incrementAndGetCatalogVersion()); + } } else { role = catalog_.addRole(sentryRole.getRoleName(), grantGroups); } @@ -160,6 +169,10 @@ public class SentryProxy { // We already know about this privilege (privileges cannot be modified). if (existingPriv != null && existingPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) { + if (resetVersions_) { + existingPriv.setCatalogVersion( + catalog_.incrementAndGetCatalogVersion()); + } continue; } catalog_.addRolePrivilege(role.getName(), thriftPriv); @@ -302,10 +315,7 @@ public class SentryProxy { // Update the catalog for (TPrivilege privilege: privileges) { RolePrivilege rolePriv = catalog_.removeRolePrivilege(roleName, privilege); - if (rolePriv == null) { - rolePriv = RolePrivilege.fromThrift(privilege); - rolePriv.setCatalogVersion(catalog_.getCatalogVersion()); - } + if (rolePriv == null) continue; rolePrivileges.add(rolePriv); } } else { @@ -317,12 +327,7 @@ public class SentryProxy { List<TPrivilege> updatedPrivileges = Lists.newArrayList(); for (TPrivilege privilege: privileges) { RolePrivilege existingPriv = catalog_.getRolePrivilege(roleName, privilege); - if (existingPriv == null) { - RolePrivilege rolePriv = RolePrivilege.fromThrift(privilege); - rolePriv.setCatalogVersion(catalog_.getCatalogVersion()); - rolePrivileges.add(rolePriv); - continue; - } + if (existingPriv == null) continue; TPrivilege updatedPriv = existingPriv.toThrift(); updatedPriv.setHas_grant_opt(false); updatedPrivileges.add(updatedPriv); @@ -342,9 +347,9 @@ public class SentryProxy { * the Catalog with any changes. Throws an ImpalaRuntimeException if there are any * errors executing the refresh job. */ - public void refresh() throws ImpalaRuntimeException { + public void refresh(boolean resetVersions) throws ImpalaRuntimeException { try { - policyReader_.submit(new PolicyReader()).get(); + policyReader_.submit(new PolicyReader(resetVersions)).get(); } catch (Exception e) { // We shouldn't make it here. It means an exception leaked from the // AuthorizationPolicyReader. http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java index 93e4af0..df2ba0d 100644 --- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java +++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java @@ -38,7 +38,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog { // Cache pools are typically loaded asynchronously, but as there is no fixed execution // order for tests, the cache pools are loaded synchronously before the tests are // executed. - CachePoolReader rd = new CachePoolReader(); + CachePoolReader rd = new CachePoolReader(false); rd.run(); } http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java index fdc64e6..7e8ff46 100644 --- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java +++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java @@ -65,9 +65,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog { /** * Reloads all metadata from the source catalog. */ - public void reset() throws CatalogException { - srcCatalog_.reset(); - } + public void reset() throws CatalogException { srcCatalog_.reset(); } /** * Overrides ImpaladCatalog.getTable to load the table metadata if it is missing. http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index e2b1715..1003dc7 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -311,14 +311,12 @@ class StatestoreSubscriber(object): class TestStatestore(): def make_topic_update(self, topic_name, key_template="foo", value_template="bar", - num_updates=1, deletions=None): + num_updates=1): topic_entries = [ Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x)) for x in xrange(num_updates)] - if deletions is None: deletions = [] return Subscriber.TTopicDelta(topic_name=topic_name, topic_entries=topic_entries, - topic_deletions=deletions, is_delta=False) def test_registration_ids_different(self): @@ -349,11 +347,9 @@ class TestStatestore(): assert len(args.topic_deltas) == 1 assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries assert args.topic_deltas[topic_name].topic_name == delta.topic_name - assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions elif sub.update_count == 3: # After the content-bearing update was processed, the next delta should be empty assert len(args.topic_deltas[topic_name].topic_entries) == 0 - assert len(args.topic_deltas[topic_name].topic_deletions) == 0 return DEFAULT_UPDATE_STATE_RESPONSE @@ -461,7 +457,7 @@ class TestStatestore(): assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1 # Statestore should not send deletions when the update is not a delta, see # IMPALA-1891 - assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0 + assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False return DEFAULT_UPDATE_STATE_RESPONSE reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
