This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2fccd82590d747d834b8be6f3b05bb446d9bac12 Author: Vihang Karajgaonkar <[email protected]> AuthorDate: Mon Oct 12 10:38:23 2020 -0700 IMPALA-6671: Skip locked tables from topic updates This change adds a mechanism for topic-update thread to skip a table which is locked for more than a configurable interval from the topic updates. This is especially useful in scenarios where long running operations on a locked table (refresh, recover partitions, compute stats) block the topic update thread. This causes unrelated queries which are waiting on metadata via topic updates (catalog-v1 mode) to unnecessarily block. The ideal solution of this problem would be to make HdfsTable immutable so that there is no need for table lock. But that is large change and not easily portable to older releases of Impala. It would be taken up as a separate patch. This change introduces 2 new configurations for catalogd: 1. topic_update_tbl_max_wait_time_ms: This defines the maximum time in msecs the topic update thread waits on a locked table before skipping the table from that iteration of topic updates. The default value is 500. If this configuration is set to 0 the lock with timeout for topic update thread is disabled. 2. catalog_max_lock_skipped_topic_updates: This defines the maximum number of distinct lock operations which are skipped by topic update thread due to lock contention. Once this limit is reached, topic update thread will block until it acquires the table lock and adds it to the updates. Testing: 1. Added a test case which introduces a simulated delay in a few potentially long running statements. This causes the table to be locked for a long time. The topic update thread skips that table from updates and unrelated queries are unblocked since they receive the required metadata from updates. 2. Added a test where multiple threads run blocking statements in a loop to stress the table lock. It makes sure that topic update thread is not starved and eventually blocks on table lock by hitting the limit defined by catalog_max_lock_skipped_topic_updates. 3. Ran exhaustive tests with default configurations. Change-Id: Ic657b96edbcdc94c6b906e7ca59291f4e4715655 Reviewed-on: http://gerrit.cloudera.org:8080/16549 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 15 + be/src/util/backend-gflag-util.cc | 5 + common/thrift/BackendGflags.thrift | 4 + .../apache/impala/analysis/CopyTestCaseStmt.java | 4 +- .../java/org/apache/impala/catalog/Catalog.java | 4 +- .../impala/catalog/CatalogServiceCatalog.java | 315 ++++++++++++++++----- .../org/apache/impala/catalog/HdfsPartition.java | 6 +- .../java/org/apache/impala/catalog/HdfsTable.java | 74 +++++ .../main/java/org/apache/impala/catalog/Table.java | 72 ++++- .../org/apache/impala/catalog/TopicUpdateLog.java | 18 +- .../apache/impala/service/CatalogOpExecutor.java | 109 ++++--- .../catalog/CatalogObjectToFromThriftTest.java | 4 +- .../custom_cluster/test_topic_update_frequency.py | 213 ++++++++++++++ 13 files changed, 696 insertions(+), 147 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index a66e256..e14c982 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -74,6 +74,21 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim "(in seconds) a partial catalog object fetch RPC spends in the queue waiting " "to run. Must be set to a value greater than zero."); +DEFINE_int32(catalog_max_lock_skipped_topic_updates, 2, "Maximum number of topic " + "updates skipped for a table due to lock contention in catalogd after which it must" + "be added to the topic the update log. This limit only applies to distinct lock " + "operations which block the topic update thread."); + +DEFINE_int64(topic_update_tbl_max_wait_time_ms, 500, "Maximum time " + "(in milliseconds) catalog's topic update thread will wait to acquire lock on " + "table. If the topic update thread cannot acquire a table lock it skips the table " + "from that topic update and processes the table in the next update. However to " + "prevent starvation it only skips the table catalog_max_lock_skipped_topic_updates " + "many times. After that limit is hit, topic thread block until it acquires the " + "table lock. A value of 0 disables the timeout based locking which means topic " + "update thread will always block until table lock is acquired."); + + DECLARE_string(debug_actions); DECLARE_string(state_store_host); DECLARE_int32(state_store_subscriber_port); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index b43c63e..754e9f2 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -85,6 +85,8 @@ DECLARE_bool(use_customized_user_groups_mapper_for_ranger); DECLARE_bool(enable_column_masking); DECLARE_bool(compact_catalog_topic); DECLARE_bool(enable_incremental_metadata_updates); +DECLARE_int64(topic_update_tbl_max_wait_time_ms); +DECLARE_int32(catalog_max_lock_skipped_topic_updates); namespace impala { @@ -173,6 +175,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { cfg.__set_compact_catalog_topic(FLAGS_compact_catalog_topic); cfg.__set_enable_incremental_metadata_updates( FLAGS_enable_incremental_metadata_updates); + cfg.__set_topic_update_tbl_max_wait_time_ms(FLAGS_topic_update_tbl_max_wait_time_ms); + cfg.__set_catalog_max_lock_skipped_topic_updates( + FLAGS_catalog_max_lock_skipped_topic_updates); RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes)); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 89ce733..bd9d4b9 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -161,4 +161,8 @@ struct TBackendGflags { 68: required bool compact_catalog_topic 69: required bool enable_incremental_metadata_updates + + 70: required i64 topic_update_tbl_max_wait_time_ms + + 71: required i32 catalog_max_lock_skipped_topic_updates } diff --git a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java index 69adb81..f6aba15 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java @@ -173,11 +173,11 @@ public class CopyTestCaseStmt extends StatementBase { } for (FeTable table: referencedTbls) { Preconditions.checkState(table instanceof FeTable); - ((Table) table).getLock().lock(); + ((Table) table).takeReadLock(); try { result.addToTables_and_views(((Table) table).toThrift()); } finally { - ((Table) table).getLock().unlock(); + ((Table) table).releaseReadLock(); } } return result; diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java index 1294d82..d232f0a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java @@ -505,13 +505,13 @@ public abstract class Catalog implements AutoCloseable { throw new CatalogException("Table not found: " + objectDesc.getTable().getTbl_name()); } - table.getLock().lock(); + table.takeReadLock(); try { result.setType(table.getCatalogObjectType()); result.setCatalog_version(table.getCatalogVersion()); result.setTable(table.toThrift()); } finally { - table.getLock().unlock(); + table.releaseReadLock(); } break; } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 23473e6..704bac4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -206,8 +207,10 @@ public class CatalogServiceCatalog extends Catalog { private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10; private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2; - // Timeout for acquiring a table lock - // TODO: Make this configurable + private final int maxSkippedUpdatesLockContention_; + //value of timeout for the topic update thread while waiting on the table lock. + private final long topicUpdateTblLockMaxWaitTimeMs_; + //Default value of timeout for acquiring a table lock. private static final long LOCK_RETRY_TIMEOUT_MS = 7200000; // Time to sleep before retrying to acquire a table lock private static final int LOCK_RETRY_DELAY_MS = 10; @@ -315,6 +318,14 @@ public class CatalogServiceCatalog extends Catalog { BackendConfig.INSTANCE.getBlacklistedDbs(), LOG); blacklistedTables_ = CatalogBlacklistUtils.parseBlacklistedTables( BackendConfig.INSTANCE.getBlacklistedTables(), LOG); + maxSkippedUpdatesLockContention_ = BackendConfig.INSTANCE + .getBackendCfg().catalog_max_lock_skipped_topic_updates; + Preconditions.checkState(maxSkippedUpdatesLockContention_ > 0, + "catalog_max_lock_skipped_topic_updates must be positive"); + topicUpdateTblLockMaxWaitTimeMs_ = BackendConfig.INSTANCE + .getBackendCfg().topic_update_tbl_max_wait_time_ms; + Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0, + "topic_update_tbl_max_wait_time_ms must be positive"); catalogServiceId_ = catalogServiceId; tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); loadInBackground_ = loadInBackground; @@ -426,30 +437,45 @@ public class CatalogServiceCatalog extends Catalog { * held when the function returns. Returns false otherwise and no lock is held in * this case. */ - public boolean tryLockTable(Table tbl) { + public boolean tryWriteLock(Table tbl) { + return tryLock(tbl, true, LOCK_RETRY_TIMEOUT_MS); + } + + /** + * Tries to acquire the table similar to described in + * {@link CatalogServiceCatalog#tryWriteLock(Table)} but with a custom timeout. + */ + public boolean tryLock(Table tbl, final boolean useWriteLock, final long timeout) { + Preconditions.checkArgument(timeout > 0); try (ThreadNameAnnotator tna = new ThreadNameAnnotator( - "Attempting to lock table " + tbl.getFullName())) { + String.format("Attempting to %s lock table %s with a timeout of %s ms", + (useWriteLock ? "write" : "read"), tbl.getFullName(), timeout))) { long begin = System.currentTimeMillis(); long end; do { versionLock_.writeLock().lock(); - if (tbl.getLock().tryLock()) { - if (LOG.isTraceEnabled()) { - end = System.currentTimeMillis(); - LOG.trace(String.format("Lock for table %s was acquired in %d msec", - tbl.getFullName(), end - begin)); - } - return true; - } - versionLock_.writeLock().unlock(); + Lock lock = useWriteLock ? tbl.writeLock() : tbl.readLock(); try { + //Note that we don't use the timeout directly here since the timeout + //since we don't want to unnecessarily hold the versionLock if the table + //cannot be acquired. Holding version lock can potentially blocks other + //unrelated DDL operations. + if (lock.tryLock(0, TimeUnit.SECONDS)) { + if (LOG.isTraceEnabled()) { + end = System.currentTimeMillis(); + LOG.trace(String.format("Lock for table %s was acquired in %d msec", + tbl.getFullName(), end - begin)); + } + return true; + } + versionLock_.writeLock().unlock(); // Sleep to avoid spinning and allow other operations to make progress. Thread.sleep(LOCK_RETRY_DELAY_MS); } catch (InterruptedException e) { // ignore } end = System.currentTimeMillis(); - } while (end - begin < LOCK_RETRY_TIMEOUT_MS); + } while (end - begin < timeout); return false; } } @@ -638,7 +664,7 @@ public class CatalogServiceCatalog extends Catalog { Map<String, ByteBuffer> stats = new HashMap<>(); HdfsTable hdfsTable = (HdfsTable) table; - hdfsTable.getLock().lock(); + hdfsTable.takeReadLock(); try { Collection<? extends PrunablePartition> partitions = hdfsTable.getPartitions(); for (PrunablePartition partition : partitions) { @@ -652,7 +678,7 @@ public class CatalogServiceCatalog extends Catalog { } } } finally { - hdfsTable.getLock().unlock(); + hdfsTable.releaseReadLock(); } LOG.info("Fetched partition statistics for " + stats.size() + " partitions on: " + hdfsTable.getFullName()); @@ -694,7 +720,7 @@ public class CatalogServiceCatalog extends Catalog { String key = Catalog.toCatalogObjectKey(obj); if (obj.type != TCatalogObjectType.CATALOG) { topicUpdateLog_.add(key, - new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion)); + new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion, 0)); if (!delete) updatedCatalogObjects.add(key); } // TODO: TSerializer.serialize() returns a copy of the internal byte array, which @@ -983,7 +1009,7 @@ public class CatalogServiceCatalog extends Catalog { } // we should acquire the table lock so that we wait for any other updates // happening to this table at the same time - if (!tryLockTable(tbl)) { + if (!tryWriteLock(tbl)) { throw new CatalogException(String.format("Error during self-event evaluation " + "for table %s due to lock contention", tbl.getFullName())); } @@ -1024,7 +1050,7 @@ public class CatalogServiceCatalog extends Catalog { return failingPartitions.isEmpty(); } } finally { - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } return false; } @@ -1279,24 +1305,162 @@ public class CatalogServiceCatalog extends Catalog { */ private void addTableToCatalogDelta(Table tbl, GetCatalogDeltaContext ctx) throws TException { - if (tbl.getCatalogVersion() <= ctx.toVersion) { - addTableToCatalogDeltaHelper(tbl, ctx); + TopicUpdateLog.Entry topicUpdateEntry = + topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName()); + Preconditions.checkNotNull(topicUpdateEntry); + // it is important to get the table version once and then use it for following logic + // since we don't have the table lock yet and the table could be changed during the + // execution below. + final long tblVersion = tbl.getCatalogVersion(); + if (tblVersion <= ctx.toVersion) { + // if we have already skipped this table due to lock contention + // maxSkippedUpdatesLockContention number of times, we must block until we add it + // to the topic updates. Otherwise, we can attempt to take a lock with a timeout. + // if the topicUpdateTblLockMaxWaitTimeMs is set to 0, it means the lock timeout + // is disabled and we should just block here until lock is acquired. + boolean lockWithTimeout = topicUpdateTblLockMaxWaitTimeMs_ > 0 + && topicUpdateEntry.getNumSkippedUpdatesLockContention() + < maxSkippedUpdatesLockContention_; + if (topicUpdateTblLockMaxWaitTimeMs_ > 0 && !lockWithTimeout) { + LOG.warn("Topic update thread blocking until lock is acquired for table {}", + tbl.getFullName()); + } + lockTableAndAddToCatalogDelta(tblVersion, tbl, ctx, lockWithTimeout); } else { - TopicUpdateLog.Entry topicUpdateEntry = - topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName()); - Preconditions.checkNotNull(topicUpdateEntry); + // tbl is outside the current topic update window. For a fast changing table, it is + // possible that this tbl is starved and never added to topic updates. Hence we + // see how many times the table was skipped before. if (topicUpdateEntry.getNumSkippedTopicUpdates() == MAX_NUM_SKIPPED_TOPIC_UPDATES) { - addTableToCatalogDeltaHelper(tbl, ctx); + LOG.warn("Topic update thread blocking until lock is acquired for table {} " + + "since the table was already skipped {} number of times", + tbl.getFullName(), MAX_NUM_SKIPPED_TOPIC_UPDATES); + lockTableAndAddToCatalogDelta(tblVersion, tbl, ctx, false); } else { LOG.info("Table {} (version={}) is skipping topic update ({}, {}]", - tbl.getFullName(), tbl.getCatalogVersion(), ctx.fromVersion, ctx.toVersion); + tbl.getFullName(), tblVersion, ctx.fromVersion, ctx.toVersion); topicUpdateLog_.add(tbl.getUniqueName(), new TopicUpdateLog.Entry( topicUpdateEntry.getNumSkippedTopicUpdates() + 1, topicUpdateEntry.getLastSentVersion(), - topicUpdateEntry.getLastSentCatalogUpdate())); + topicUpdateEntry.getLastSentCatalogUpdate(), + topicUpdateEntry.getNumSkippedUpdatesLockContention())); + } + } + } + + /** + * This method takes a lock on the table and adds it to the + * {@link GetCatalogDeltaContext} which is eventually sent via the topic updates. A lock + * on table essentially blocks other concurrent catalog operations on the table. Also, + * if the table is a {@link HdfsTable} and it is already locked, this method may or may + * not block until the table lock is acquired depending on whether lockWithTimeout + * parameter is false or not. + * When the lockWithTimeout is true this method attempts to acquire a lock with a + * timeout specified by {@code topicUpdateTblLockMaxWaitTimeMs}. If the lock is acquired + * within the timeout, it continues ahead and adds the table to the ctx. However, if + * the lock is not acquired within the timeout, it skips the table and increments + * the counter in {@link TopicUpdateLog.Entry}. + * @param tblVersion The table version at the time when topic update evaluates if the + * table needs to be in this update or not. + * @param tbl The table object which needs to added to the topic update. + * @param ctx GetCatalogDeltaContext where the table is add if the lock is acquired. + * @param lockWithTimeout If this is true, the method attempts to lock with a timeout + * else, it blocks until lock is acquired. + */ + private void lockTableAndAddToCatalogDelta(final long tblVersion, Table tbl, + GetCatalogDeltaContext ctx, boolean lockWithTimeout) throws TException { + Stopwatch sw = Stopwatch.createStarted(); + if (tbl instanceof HdfsTable && lockWithTimeout) { + if (!lockHdfsTblWithTimeout(tblVersion, (HdfsTable) tbl, ctx)) return; + } else { + // this is not HdfsTable or lockWithTimeout is false. + // We block until table read lock is acquired. + tbl.takeReadLock(); + } + long elapsedTime = sw.stop().elapsed(TimeUnit.MILLISECONDS); + if (elapsedTime > 2000) { + LOG.debug("Time taken to acquire read lock on table {} for topic update {} ms", + tbl.getFullName(), elapsedTime); + } + try { + addTableToCatalogDeltaHelper(tbl, ctx); + } finally { + tbl.releaseReadLock(); + } + } + + /** + * Attempts to take a read lock on the give HdfsTable within a configurable timeout + * of {@code topicUpdateTblLockMaxWaitTimeMs}. + * @param tblVersion The version of the table when topic update thread inspects the + * table to be added to the catalog topic updates. + * @param hdfsTable The table to be read locked. + * @param ctx The current {@link GetCatalogDeltaContext} for this topic update. + * @return true if the table was successfully read-locked, false otherwise. + */ + private boolean lockHdfsTblWithTimeout(long tblVersion, HdfsTable hdfsTable, + GetCatalogDeltaContext ctx) { + // see the comment below on why we need 2 attempts. + final int maxAttempts = 2; + int attemptCount = 0; + boolean lockAcquired; + do { + attemptCount++; + // topicUpdateTblLockMaxWaitTimeMs indicates the total amount of time we are willing + // to wait to acquire the table lock. We make 2 attempts and hence the timeout here + // is topicUpdateTblLockMaxWaitTimeMs/2 so that overall the method waits for + // maximum of topicUpdateTblLockMaxWaitTimeMs for the lock. + long timeoutMs = topicUpdateTblLockMaxWaitTimeMs_ /maxAttempts; + lockAcquired = tryLock(hdfsTable, false, timeoutMs); + if (lockAcquired) { + // table lock was successfully acquired. We can now release the versionLock. + versionLock_.writeLock().unlock(); + return true; } + // If we reach here, the topic update thread could not take a lock on this table. + // We should bump up the pending table version so that this gets included in the + // next topic update. However, there is a race condition which needs to be handled + // here. If we update the pending version here + // after hdfsTable.setCatalogVersion() is called from CatalogOpExecutor, the bump up + // of pendingVersion takes no effect. Hence we detect such case by comparing the + // tbl version with the expected tblVersion. if the tblVersion does not match, it + // means that the setCatalogVersion has already been called and there is no point + // in bumping up the pending version now. We retry to take a lock on the tbl again + // in such case. If we cannot get a read lock in attempt 2, it means that we were + // unlucky again and some other write operation has acquired the tbl lock. In such + // a case it is guaranteed that the tbl version will be updated outside current + // window of the topic updates and it is safe to skip the table. + boolean pendingVersionUpdated = hdfsTable + .updatePendingVersion(tblVersion, incrementAndGetCatalogVersion()); + if (pendingVersionUpdated) break; + // if pendingVersionUpdated is false it means that tblVersion has been changed + // and hence we didn't update the pendingVersion. We retry once to acquire a read + // lock. + } while (attemptCount != maxAttempts); + // lock could not be acquired, we update the skip count in the topicUpdate entry + // if applicable. + TopicUpdateLog.Entry topicUpdateEntry = topicUpdateLog_ + .getOrCreateLogEntry(hdfsTable.getUniqueName()); + LOG.info( + "Table {} (version={}, lastSeen={}) is skipping topic update ({}, {}] " + + "due to lock contention", hdfsTable.getFullName(), tblVersion, + hdfsTable.getLastVersionSeenByTopicUpdate(), ctx.fromVersion, ctx.toVersion); + if (hdfsTable.getLastVersionSeenByTopicUpdate() != tblVersion) { + // if the last version skipped by topic update is not same as the last version + // sent, it means the table was updated and topic update thread is lagging + // behind. + topicUpdateLog_.add(hdfsTable.getUniqueName(), + new TopicUpdateLog.Entry( + topicUpdateEntry.getNumSkippedTopicUpdates(), + topicUpdateEntry.getLastSentVersion(), + topicUpdateEntry.getLastSentCatalogUpdate(), + topicUpdateEntry.getNumSkippedUpdatesLockContention() + 1)); + // we keep track of the table version when topic update thread had to skip the + // table from updates so that next iteration can determine if we need to + // increment the lock contention counter in topic update entry again. + hdfsTable.setLastVersionSeenByTopicUpdate(tblVersion); } + return false; } /** @@ -1306,44 +1470,47 @@ public class CatalogServiceCatalog extends Catalog { */ private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext ctx) throws TException { + Preconditions.checkState(tbl.isReadLockedByCurrentThread(), + "Topic update thread does not hold a lock on table " + tbl.getFullName() + + " while generating catalog delta"); TCatalogObject catalogTbl = new TCatalogObject(TABLE, Catalog.INITIAL_CATALOG_VERSION); - tbl.getLock().lock(); + long tblVersion = tbl.getCatalogVersion(); + if (tblVersion <= ctx.fromVersion) { + LOG.trace("Table {} version {} skipping the update ({}, {}]", + tbl.getFullName(), tbl.getCatalogVersion(), ctx.fromVersion, ctx.toVersion); + return; + } + String tableUniqueName = tbl.getUniqueName(); + TopicUpdateLog.Entry topicUpdateEntry = + topicUpdateLog_.getOrCreateLogEntry(tableUniqueName); + if (tblVersion > ctx.toVersion && + topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) { + LOG.info("Table " + tbl.getFullName() + " is skipping topic update " + + ctx.toVersion); + topicUpdateLog_.add(tableUniqueName, + new TopicUpdateLog.Entry( + topicUpdateEntry.getNumSkippedTopicUpdates() + 1, + topicUpdateEntry.getLastSentVersion(), + topicUpdateEntry.getLastSentCatalogUpdate(), + topicUpdateEntry.getNumSkippedUpdatesLockContention())); + return; + } try { - long tblVersion = tbl.getCatalogVersion(); - if (tblVersion <= ctx.fromVersion) return; - String tableUniqueName = tbl.getUniqueName(); - TopicUpdateLog.Entry topicUpdateEntry = - topicUpdateLog_.getOrCreateLogEntry(tableUniqueName); - if (tblVersion > ctx.toVersion && - topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) { - LOG.info("Table " + tbl.getFullName() + " is skipping topic update " + - ctx.toVersion); - topicUpdateLog_.add(tableUniqueName, - new TopicUpdateLog.Entry( - topicUpdateEntry.getNumSkippedTopicUpdates() + 1, - topicUpdateEntry.getLastSentVersion(), - topicUpdateEntry.getLastSentCatalogUpdate())); - return; - } - try { - if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled() - && tbl instanceof HdfsTable) { - catalogTbl.setTable(((HdfsTable) tbl).toThriftWithMinimalPartitions()); - addHdfsPartitionsToCatalogDelta((HdfsTable) tbl, ctx); - } else { - catalogTbl.setTable(tbl.toThrift()); - } - } catch (Exception e) { - LOG.error(String.format("Error calling toThrift() on table %s: %s", - tbl.getFullName(), e.getMessage()), e); - return; + if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled() + && tbl instanceof HdfsTable) { + catalogTbl.setTable(((HdfsTable) tbl).toThriftWithMinimalPartitions()); + addHdfsPartitionsToCatalogDelta((HdfsTable) tbl, ctx); + } else { + catalogTbl.setTable(tbl.toThrift()); } - catalogTbl.setCatalog_version(tbl.getCatalogVersion()); - ctx.addCatalogObject(catalogTbl, false); - } finally { - tbl.getLock().unlock(); + } catch (Exception e) { + LOG.error(String.format("Error calling toThrift() on table %s: %s", + tbl.getFullName(), e.getMessage()), e); + return; } + catalogTbl.setCatalog_version(tbl.getCatalogVersion()); + ctx.addCatalogObject(catalogTbl, false); } private void addHdfsPartitionsToCatalogDelta(HdfsTable hdfsTable, @@ -2064,6 +2231,11 @@ public class CatalogServiceCatalog extends Catalog { deleteLog_.addRemovedObject(existingTbl.toMinimalTCatalogObject()); } updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion()); + // note that we update the db with a new instance of table. In such case + // we may lose some temporary state stored in the old table like pendingVersion + // This is okay since the table version is bumped up to the next topic update window + // and eventually the table will need to added to the topic update if hits the + // MAX_NUM_SKIPPED_TOPIC_UPDATES limit. db.addTable(updatedTbl); return updatedTbl; } finally { @@ -2297,7 +2469,7 @@ public class CatalogServiceCatalog extends Catalog { Preconditions.checkState(!(tbl instanceof IncompleteTable)); String dbName = tbl.getDb().getName(); String tblName = tbl.getName(); - if (!tryLockTable(tbl)) { + if (!tryWriteLock(tbl)) { throw new CatalogException(String.format("Error refreshing metadata for table " + "%s due to lock contention", tbl.getFullName())); } @@ -2328,7 +2500,7 @@ public class CatalogServiceCatalog extends Catalog { } finally { context.stop(); Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread()); - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -2340,7 +2512,7 @@ public class CatalogServiceCatalog extends Catalog { throws CatalogException { Preconditions.checkNotNull(tbl); Preconditions.checkNotNull(partitionSet); - Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table"); } @@ -2415,11 +2587,11 @@ public class CatalogServiceCatalog extends Catalog { Table result = removeTable(dbName, tblName); if (result == null) return null; tblWasRemoved.setRef(true); - result.getLock().lock(); + result.takeReadLock(); try { return result.toTCatalogObject(); } finally { - result.getLock().unlock(); + result.releaseReadLock(); } } @@ -2844,7 +3016,7 @@ public class CatalogServiceCatalog extends Catalog { public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec, Reference<Boolean> wasPartitionReloaded, CatalogObject.ThriftObjectType resultType, String reason) throws CatalogException { - if (!tryLockTable(tbl)) { + if (!tryWriteLock(tbl)) { throw new CatalogException(String.format("Error reloading partition of table %s " + "due to lock contention", tbl.getFullName())); } @@ -2894,7 +3066,7 @@ public class CatalogServiceCatalog extends Catalog { return hdfsTable.toTCatalogObject(resultType); } finally { Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread()); - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -3111,7 +3283,7 @@ public class CatalogServiceCatalog extends Catalog { ". Table not yet loaded."; return result; } - if (!tbl.getLock().tryLock()) { + if (!tbl.tryReadLock()) { result = "Metrics for table " + dbName + "." + tblName + "are not available " + "because the table is currently modified by another operation."; return result; @@ -3119,7 +3291,7 @@ public class CatalogServiceCatalog extends Catalog { try { return tbl.getMetrics().toString(); } finally { - tbl.getLock().unlock(); + tbl.releaseReadLock(); } } @@ -3235,8 +3407,7 @@ public class CatalogServiceCatalog extends Catalog { } Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos; TGetPartialCatalogObjectResponse resp; - // TODO(todd): consider a read-write lock here. - table.getLock().lock(); + table.takeReadLock(); try { if (table instanceof HdfsTable || table instanceof IcebergTable) { HdfsTable hdfsTable = table instanceof HdfsTable ? (HdfsTable) table : @@ -3252,7 +3423,7 @@ public class CatalogServiceCatalog extends Catalog { return table.getPartialInfo(req); } } finally { - table.getLock().unlock(); + table.releaseReadLock(); } } case FUNCTION: { diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 50fbde7..3d3b0c3 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -841,7 +841,7 @@ public class HdfsPartition extends CatalogObjectImpl */ public boolean removeFromVersionsForInflightEvents( boolean isInsertEvent, long versionNumber) { - Preconditions.checkState(table_.getLock().isHeldByCurrentThread(), + Preconditions.checkState(table_.isWriteLockedByCurrentThread(), "removeFromVersionsForInflightEvents called without holding the table lock on " + "partition " + getPartitionName() + " of table " + table_.getFullName()); return inFlightEvents_.remove(isInsertEvent, versionNumber); @@ -855,7 +855,7 @@ public class HdfsPartition extends CatalogObjectImpl * when isInsertEvent is false, it's version number to add */ public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) { - Preconditions.checkState(table_.getLock().isHeldByCurrentThread(), + Preconditions.checkState(table_.isWriteLockedByCurrentThread(), "addToVersionsForInflightEvents called without holding the table lock on " + "partition " + getPartitionName() + " of table " + table_.getFullName()); if (!inFlightEvents_.add(isInsertEvent, versionNumber)) { @@ -1452,7 +1452,7 @@ public class HdfsPartition extends CatalogObjectImpl */ public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) { - Preconditions.checkState(table_.getLock().isHeldByCurrentThread(), + Preconditions.checkState(table_.isWriteLockedByCurrentThread(), "addToVersionsForInflightEvents called without holding the table lock on " + "partition " + getPartitionName() + " of table " + table_.getFullName()); if (!inFlightEvents_.add(isInsertEvent, versionNumber)) { diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 77825cc..05520e2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -295,6 +295,21 @@ public class HdfsTable extends Table implements FeFsTable { // in coordinator's cache if there are no updates on them. private final Set<HdfsPartition> droppedPartitions_ = new HashSet<>(); + // pendingVersionNumber indicates a version number allocated to this HdfsTable for a + // ongoing DDL operation. This is mainly used by the topic update thread to skip a + // table from the topic updates if it cannot acquire lock on this table. The topic + // update thread bumps up this to a higher value outside the topic window so that + // the table is considered in the next update. The setCatalogVersion() makes use of this + // to eventually assign the catalogVersion to the table. + private long pendingVersionNumber_ = -1; + // lock protecting access to pendingVersionNumber + private final Object pendingVersionLock_ = new Object(); + + // this field is used to keep track of the last table version which is seen by the + // topic update thread. It is primarily used to identify distinct locking operations + // to determine if we can skip the table from the topic update. + private long lastVersionSeenByTopicUpdate_ = -1; + // Represents a set of storage-related statistics aggregated at the table or partition // level. public final static class FileMetadataStats { @@ -2647,4 +2662,63 @@ public class HdfsTable extends Table implements FeFsTable { public ValidWriteIdList getValidWriteIds() { return validWriteIds_; } + + /** + * Updates the pending version of this table if the tbl version matches with the + * expectedTblVersion. + * @return true if the pending version was updated. Else, false. + */ + public boolean updatePendingVersion(long expectedTblVersion, long newPendingVersion) { + synchronized (pendingVersionLock_) { + if (expectedTblVersion == getCatalogVersion()) { + pendingVersionNumber_ = newPendingVersion; + return true; + } + return false; + } + } + + /** + * Sets the version of this table. This makes sure that if there is a + * pendingVersionNumber which is higher than the given version, it uses the + * pendingVersionNumber. A pendingVersionNumber which is higher than given version + * represents that the topic update thread tried to add this table to a update but + * couldn't. Hence it needs the ongoing update operation (represented by the given + * version) to use a higher version number so that this table falls within the next + * topic update window. + */ + @Override + public void setCatalogVersion(long version) { + synchronized (pendingVersionLock_) { + long versionToBeSet = version; + if (pendingVersionNumber_ > version) { + LOG.trace("Pending table version {} is higher than requested version {}", + pendingVersionNumber_, version); + versionToBeSet = pendingVersionNumber_; + } + LOG.trace("Setting the hdfs table {} version {}", getFullName(), versionToBeSet); + super.setCatalogVersion(versionToBeSet); + } + } + + /** + * Returns the last version of this table which was seen by the topic update thread + * when it could not acquire the table lock. This is used to determine if the topic + * update thread has skipped this table enough number of times that we should now + * block the topic updates until we add this table. Note that + * this method is not thread-safe and assumes that this only called from the + * topic-update thread. + */ + public long getLastVersionSeenByTopicUpdate() { + return lastVersionSeenByTopicUpdate_; + } + + /** + * Sets the version as seen by the topic update thread if it skips the table. Note that + * this method is not thread-safe and assumes that this only called from the + * topic-update thread. + */ + public void setLastVersionSeenByTopicUpdate(long version) { + lastVersionSeenByTopicUpdate_ = version; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 3eb7802..910bd49 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -25,8 +25,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; @@ -78,8 +79,14 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { protected final String name_; protected final String owner_; protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE; - // Lock protecting this table - private final ReentrantLock tableLock_ = new ReentrantLock(); + // Lock protecting this table. A read lock must be table when we are serializing + // the table contents over thrift (e.g when returning the table to clients over thrift + // or when topic-update thread serializes the table in the topic update) + // A write lock must be table when the table is being modified (e.g. DDLs or refresh) + private final ReentrantReadWriteLock tableLock_ = new ReentrantReadWriteLock( + true /*fair ordering*/); + private final ReadLock readLock_ = tableLock_.readLock(); + private final WriteLock writeLock_ = tableLock_.writeLock(); // Number of clustering columns. protected int numClusteringCols_; @@ -203,7 +210,52 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE)); } - public ReentrantLock getLock() { return tableLock_; } + public void takeReadLock() { + readLock_.lock(); + } + + public ReadLock readLock() { return readLock_; } + public WriteLock writeLock() { return writeLock_; } + + public void releaseReadLock() { + readLock_.unlock(); + } + + public boolean isReadLockedByCurrentThread() { + return tableLock_.getReadHoldCount() > 0; + } + + public boolean tryReadLock() { + try { + // a tryLock with a 0 timeout honors the fairness for lock acquisition + // in case there are other threads waiting to acquire a read lock when + // compared to tryLock() which "barges" in and takes a read lock if + // available with no consideration to other threads waiting for a read lock. + return readLock_.tryLock(0, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignored + } + return false; + } + + public void releaseWriteLock() { + writeLock_.unlock(); + } + + /** + * Returns true if this table is write locked by current thread. + */ + public boolean isWriteLockedByCurrentThread() { + return writeLock_.isHeldByCurrentThread(); + } + + /** + * Returns true if this table is write locked by any thread. + */ + public boolean isWriteLocked() { + return tableLock_.isWriteLocked(); + } + @Override public abstract TTableDescriptor toThriftDescriptor( int tableId, Set<Long> referencedPartitions); @@ -492,7 +544,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // the table lock should already be held, and we want the toThrift() to be consistent // with the modification. So this check helps us identify places where the lock // acquisition is probably missing entirely. - if (!tableLock_.isHeldByCurrentThread()) { + if (!isLockedByCurrentThread()) { throw new IllegalStateException( "Table.toThrift() called without holding the table lock: " + getFullName() + " " + getClass().getName()); @@ -524,6 +576,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { return table; } + private boolean isLockedByCurrentThread() { + return isReadLockedByCurrentThread() || tableLock_.isWriteLockedByCurrentThread(); + } + public TCatalogObject toMinimalTCatalogObject() { return toMinimalTCatalogObjectHelper(); } @@ -809,7 +865,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { */ public boolean removeFromVersionsForInflightEvents( boolean isInsertEvent, long versionNumber) { - Preconditions.checkState(tableLock_.isHeldByCurrentThread(), + Preconditions.checkState(isWriteLockedByCurrentThread(), "removeFromVersionsForInFlightEvents called without taking the table lock on " + getFullName()); return inFlightEvents.remove(isInsertEvent, versionNumber); @@ -831,7 +887,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // we generally don't take locks on Incomplete tables since they are atomically // replaced during load Preconditions.checkState( - this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread()); + this instanceof IncompleteTable || isWriteLockedByCurrentThread()); if (!inFlightEvents.add(isInsertEvent, versionNumber)) { LOG.warn(String.format("Could not add %s version to the table %s. This could " + "cause unnecessary refresh of the table when the event is received by the " diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java index be80b3b..040e0cf 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java +++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java @@ -55,7 +55,8 @@ public class TopicUpdateLog { // associated with a catalog object and stores information about the last topic update // that processed that object. public static class Entry { - // Number of times the entry has skipped a topic update. + // Number of times the entry has skipped a topic update because the table version + // was out of the requested update version window. private final int numSkippedUpdates_; // Last version of the corresponding catalog object that was added to a topic // update. -1 if the object was never added to a topic update. @@ -63,30 +64,41 @@ public class TopicUpdateLog { // Version of the last topic update to include the corresponding catalog object. // -1 if the object was never added to a topic update. private final long lastSentTopicUpdate_; + // number of time the topic update skipped this table due to lock contention + private final int numSkippedUpdatesLockContention_; Entry() { numSkippedUpdates_ = 0; lastSentVersion_ = -1; lastSentTopicUpdate_ = -1; + numSkippedUpdatesLockContention_ = 0; } - Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate) { + Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate, + int numSkippedUpdatesLockContention) { numSkippedUpdates_ = numSkippedUpdates; lastSentVersion_ = lastSentVersion; lastSentTopicUpdate_ = lastSentCatalogUpdate; + numSkippedUpdatesLockContention_ = numSkippedUpdatesLockContention; } public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; } public long getLastSentVersion() { return lastSentVersion_; } public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; } + public int getNumSkippedUpdatesLockContention() { + return numSkippedUpdatesLockContention_; + } + @Override public boolean equals(Object other) { if (this.getClass() != other.getClass()) return false; Entry entry = (Entry) other; return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates() && lastSentVersion_ == entry.getLastSentVersion() - && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate(); + && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate() + && numSkippedUpdatesLockContention_ == entry + .getNumSkippedUpdatesLockContention(); } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index fcf3cb5..3841dfd 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -98,7 +98,6 @@ import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.HiveStorageDescriptorFactory; -import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.IncompleteTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; @@ -594,11 +593,11 @@ public class CatalogOpExecutor { // The table lock is needed here since toTCatalogObject() calls Table#toThrift() // which expects the current thread to hold this lock. For more details refer // to IMPALA-4092. - t.getLock().lock(); + t.takeReadLock(); try { response.result.addToUpdated_catalog_objects(t.toTCatalogObject()); } finally { - t.getLock().unlock(); + t.releaseReadLock(); } } } @@ -666,7 +665,7 @@ public class CatalogOpExecutor { String.format("Can't rename to blacklisted table name: %s. %s", newTableName, BLACKLISTED_DBS_INCONSISTENT_ERR_STR)); } - tryLock(tbl); + tryWriteLock(tbl); // Get a new catalog version to assign to the table being altered. long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(), newCatalogVersion); @@ -884,7 +883,7 @@ public class CatalogOpExecutor { UnlockWriteLockIfErronouslyLocked(); // Clear in-progress modifications in case of exceptions. tbl.resetInProgressModification(); - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -906,7 +905,7 @@ public class CatalogOpExecutor { private void alterKuduTable(TAlterTableParams params, TDdlExecResponse response, KuduTable tbl, long newCatalogVersion, boolean wantMinimalResult) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); switch (params.getAlter_type()) { case ADD_COLUMNS: TAlterTableAddColsParams addColParams = params.getAdd_cols_params(); @@ -965,7 +964,7 @@ public class CatalogOpExecutor { private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse response, IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); switch (params.getAlter_type()) { case ADD_COLUMNS: TAlterTableAddColsParams addColParams = params.getAdd_cols_params(); @@ -1008,7 +1007,7 @@ public class CatalogOpExecutor { private void loadTableMetadata(Table tbl, long newCatalogVersion, boolean reloadFileMetadata, boolean reloadTableSchema, Set<String> partitionsToUpdate, String reason) throws CatalogException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(msClient, tbl); @@ -1071,7 +1070,7 @@ public class CatalogOpExecutor { "Load for ALTER VIEW"); Preconditions.checkState(tbl instanceof View, "Expected view: %s", tableName); - tryLock(tbl); + tryWriteLock(tbl); try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); @@ -1102,7 +1101,7 @@ public class CatalogOpExecutor { addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result); } finally { UnlockWriteLockIfErronouslyLocked(); - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -1136,7 +1135,7 @@ public class CatalogOpExecutor { Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns, @Nullable String debugAction) throws ImpalaException { - Preconditions.checkState(table.getLock().isHeldByCurrentThread()); + Preconditions.checkState(table.isWriteLockedByCurrentThread()); Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats()); TableName tableName = table.getTableName(); @@ -1610,7 +1609,7 @@ public class CatalogOpExecutor { Preconditions.checkState(!AcidUtils.isTransactionalTable( table.getMetaStoreTable().getParameters())); - tryLock(table, "dropping stats"); + tryWriteLock(table, "dropping stats"); try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); @@ -1643,7 +1642,7 @@ public class CatalogOpExecutor { addSummary(resp, "Stats have been dropped."); } finally { UnlockWriteLockIfErronouslyLocked(); - table.getLock().unlock(); + table.releaseWriteLock(); } } @@ -1652,7 +1651,7 @@ public class CatalogOpExecutor { * that were updated as part of this operation. */ private int dropColumnStats(Table table) throws ImpalaRuntimeException { - Preconditions.checkState(table.getLock().isHeldByCurrentThread()); + Preconditions.checkState(table.isWriteLockedByCurrentThread()); int numColsUpdated = 0; try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { for (Column col: table.getColumns()) { @@ -1684,7 +1683,7 @@ public class CatalogOpExecutor { * is unpartitioned. */ private int dropTableStats(Table table) throws ImpalaException { - Preconditions.checkState(table.getLock().isHeldByCurrentThread()); + Preconditions.checkState(table.isWriteLockedByCurrentThread()); // Delete the ROW_COUNT from the table (if it was set). org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable(); int numTargetedPartitions = 0; @@ -2130,7 +2129,7 @@ public class CatalogOpExecutor { // If transactional, the lock will be released for some time to acquire the HMS Acid // lock. It's safe because transactional -> non-transactional conversion is not // allowed. - tryLock(table, "truncating"); + tryWriteLock(table, "truncating"); try { long newCatalogVersion = 0; try { @@ -2150,8 +2149,8 @@ public class CatalogOpExecutor { addTableToCatalogUpdate(table, wantMinimalResult, resp.result); } finally { UnlockWriteLockIfErronouslyLocked(); - if (table.getLock().isHeldByCurrentThread()) { - table.getLock().unlock(); + if (table.isWriteLockedByCurrentThread()) { + table.releaseWriteLock(); } } } @@ -2166,7 +2165,7 @@ public class CatalogOpExecutor { */ private long truncateTransactionalTable(TTruncateParams params, Table table) throws ImpalaException { - Preconditions.checkState(table.getLock().isHeldByCurrentThread()); + Preconditions.checkState(table.isWriteLockedByCurrentThread()); Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread()); catalog_.getLock().writeLock().unlock(); TableName tblName = TableName.fromThrift(params.getTable_name()); @@ -2180,11 +2179,11 @@ public class CatalogOpExecutor { Preconditions.checkState(txn.getId() > 0); // We need to release catalog table lock here, because HMS Acid table lock // must be locked in advance to avoid dead lock. - table.getLock().unlock(); + table.releaseWriteLock(); //TODO: if possible, set DataOperationType to something better than NO_TXN. catalog_.lockTableInTransaction(tblName.getDb(), tblName.getTbl(), txn, DataOperationType.NO_TXN, ctx); - tryLock(table, "truncating"); + tryWriteLock(table, "truncating"); newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient, @@ -2244,7 +2243,7 @@ public class CatalogOpExecutor { */ private boolean isTableBeingReplicated(IMetaStoreClient metastoreClient, HdfsTable tbl) throws CatalogException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); String dbName = tbl.getDb().getName(); try { Database db = metastoreClient.getDatabase(dbName); @@ -2260,7 +2259,7 @@ public class CatalogOpExecutor { private long truncateNonTransactionalTable(TTruncateParams params, Table table) throws Exception { - Preconditions.checkState(table.getLock().isHeldByCurrentThread()); + Preconditions.checkState(table.isWriteLockedByCurrentThread()); Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread()); long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); @@ -2399,7 +2398,7 @@ public class CatalogOpExecutor { addSummary(response, "Table already exists."); LOG.trace("Skipping table creation because {} already exists and " + "IF NOT EXISTS was specified.", tableName); - tryLock(existingTbl); + tryWriteLock(existingTbl); try { if (syncDdl) { // When SYNC_DDL is enabled and the table already exists, we force a version @@ -2421,7 +2420,7 @@ public class CatalogOpExecutor { } finally { // Release the locks held in tryLock(). catalog_.getLock().writeLock().unlock(); - existingTbl.getLock().unlock(); + existingTbl.releaseWriteLock(); } return false; } @@ -2838,7 +2837,7 @@ public class CatalogOpExecutor { addSummary(response, "Table already exists."); LOG.trace(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tblName)); - tryLock(existingTbl); + tryWriteLock(existingTbl); try { if (syncDdl) { // When SYNC_DDL is enabled and the table already exists, we force a version @@ -2860,7 +2859,7 @@ public class CatalogOpExecutor { } finally { // Release the locks held in tryLock(). catalog_.getLock().writeLock().unlock(); - existingTbl.getLock().unlock(); + existingTbl.releaseWriteLock(); } return; } @@ -3000,7 +2999,7 @@ public class CatalogOpExecutor { */ private boolean alterTableAddCols(Table tbl, List<TColumn> columns, boolean ifNotExists) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); List<TColumn> colsToAdd = new ArrayList<>(); for (TColumn column: columns) { @@ -3028,7 +3027,7 @@ public class CatalogOpExecutor { */ private void alterTableReplaceCols(Table tbl, List<TColumn> columns) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); List<FieldSchema> newColumns = buildFieldSchemaList(columns); msTbl.getSd().setCols(newColumns); @@ -3048,7 +3047,7 @@ public class CatalogOpExecutor { */ private void alterTableAlterCol(Table tbl, String colName, TColumn newCol) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); // Find the matching column name and change it. Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator(); @@ -3096,7 +3095,7 @@ public class CatalogOpExecutor { */ private Table alterTableAddPartitions(Table tbl, TAlterTableAddPartitionParams addPartParams) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); TableName tableName = tbl.getTableName(); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); @@ -3269,7 +3268,7 @@ public class CatalogOpExecutor { List<List<TPartitionKeyValue>> partitionSet, boolean ifExists, boolean purge, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); Preconditions.checkNotNull(partitionSet); TableName tableName = tbl.getTableName(); @@ -3323,7 +3322,7 @@ public class CatalogOpExecutor { * Removes a column from the given table. */ private void alterTableDropCol(Table tbl, String colName) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); // Find the matching column name and remove it. Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator(); @@ -3355,7 +3354,7 @@ public class CatalogOpExecutor { private void alterTableOrViewRename(Table oldTbl, TableName newTableName, long newCatalogVersion, boolean wantMinimalResult, TDdlExecResponse response) throws ImpalaException { - Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread() + Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread() && catalog_.getLock().isWriteLockedByCurrentThread()); TableName tableName = oldTbl.getTableName(); org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -3466,7 +3465,7 @@ public class CatalogOpExecutor { List<List<TPartitionKeyValue>> partitionSet, THdfsFileFormat fileFormat, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); boolean reloadFileMetadata = false; if (partitionSet == null) { org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -3501,7 +3500,7 @@ public class CatalogOpExecutor { List<List<TPartitionKeyValue>> partitionSet, TTableRowFormat tRowFormat, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); Preconditions.checkArgument(tbl instanceof HdfsTable); boolean reloadFileMetadata = false; RowFormat rowFormat = RowFormat.fromThrift(tRowFormat); @@ -3548,7 +3547,7 @@ public class CatalogOpExecutor { */ private boolean alterTableSetLocation(Table tbl, List<TPartitionKeyValue> partitionSpec, String location) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); boolean reloadFileMetadata = false; if (partitionSpec == null) { org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -3578,7 +3577,7 @@ public class CatalogOpExecutor { private void alterTableSetTblProperties(Table tbl, TAlterTableSetTblPropertiesParams params, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); Map<String, String> properties = params.getProperties(); Preconditions.checkNotNull(properties); if (params.isSetPartition_set()) { @@ -3657,7 +3656,7 @@ public class CatalogOpExecutor { */ private boolean alterTableSetCached(Table tbl, TAlterTableSetCachedParams params) throws ImpalaException { - Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread()); THdfsCachingOp cacheOp = params.getCache_op(); Preconditions.checkNotNull(cacheOp); // Alter table params. @@ -3785,7 +3784,7 @@ public class CatalogOpExecutor { private void alterPartitionSetCached(Table tbl, TAlterTableSetCachedParams params, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread()); THdfsCachingOp cacheOp = params.getCache_op(); Preconditions.checkNotNull(cacheOp); Preconditions.checkNotNull(params.getPartition_set()); @@ -3847,7 +3846,7 @@ public class CatalogOpExecutor { */ private void alterTableRecoverPartitions(Table tbl, @Nullable String debugAction) throws ImpalaException { - Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); + Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table"); } @@ -4493,11 +4492,11 @@ public class CatalogOpExecutor { } } else { // Table was loaded from scratch, so it's already "refreshed". - tbl.getLock().lock(); + tbl.takeReadLock(); try { updatedThriftTable = tbl.toTCatalogObject(resultType); } finally { - tbl.getLock().unlock(); + tbl.releaseReadLock(); } } } @@ -4572,7 +4571,7 @@ public class CatalogOpExecutor { update.getTarget_table()); } - tryLock(table, "updating the catalog"); + tryWriteLock(table, "updating the catalog"); final Timer.Context context = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time(); @@ -4809,7 +4808,7 @@ public class CatalogOpExecutor { } finally { context.stop(); UnlockWriteLockIfErronouslyLocked(); - table.getLock().unlock(); + table.releaseWriteLock(); } if (update.isSync_ddl()) { @@ -5175,7 +5174,7 @@ public class CatalogOpExecutor { throws CatalogException, InternalException, ImpalaRuntimeException { Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), "Load for ALTER COMMENT"); - tryLock(tbl); + tryWriteLock(tbl); try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); @@ -5195,7 +5194,7 @@ public class CatalogOpExecutor { addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table")); } finally { - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -5204,7 +5203,7 @@ public class CatalogOpExecutor { throws CatalogException, InternalException, ImpalaRuntimeException { Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), "Load for ALTER COLUMN COMMENT"); - tryLock(tbl); + tryWriteLock(tbl); try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); @@ -5230,7 +5229,7 @@ public class CatalogOpExecutor { addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); addSummary(response, "Column has been altered."); } finally { - tbl.getLock().unlock(); + tbl.releaseWriteLock(); } } @@ -5251,20 +5250,20 @@ public class CatalogOpExecutor { } /** - * Try to lock a table in the catalog. Throw an InternalException if the catalog is - * unable to lock the given table. + * Tries to take the write lock of the table in the catalog. Throws an InternalException + * if the catalog is unable to lock the given table. */ - private void tryLock(Table tbl) throws InternalException { - tryLock(tbl, "altering"); + private void tryWriteLock(Table tbl) throws InternalException { + tryWriteLock(tbl, "altering"); } /** * Try to lock a table in the catalog for a given operation. Throw an InternalException * if the catalog is unable to lock the given table. */ - private void tryLock(Table tbl, String operation) throws InternalException { + private void tryWriteLock(Table tbl, String operation) throws InternalException { String type = tbl instanceof View ? "view" : "table"; - if (!catalog_.tryLockTable(tbl)) { + if (!catalog_.tryWriteLock(tbl)) { throw new InternalException(String.format("Error %s (for) %s %s due to " + "lock contention.", operation, type, tbl.getFullName())); } diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java index 4138c2f..9789bc6 100644 --- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java @@ -264,11 +264,11 @@ public class CatalogObjectToFromThriftTest { private TTable getThriftTable(Table table) { TTable thriftTable = null; - table.getLock().lock(); + table.takeReadLock(); try { thriftTable = table.toThrift(); } finally { - table.getLock().unlock(); + table.releaseReadLock(); } return thriftTable; } diff --git a/tests/custom_cluster/test_topic_update_frequency.py b/tests/custom_cluster/test_topic_update_frequency.py new file mode 100644 index 0000000..65f3f73 --- /dev/null +++ b/tests/custom_cluster/test_topic_update_frequency.py @@ -0,0 +1,213 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from multiprocessing.pool import ThreadPool + +import pytest +import time + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.skip import SkipIfS3 + + [email protected]_listing_times +class TestTopicUpdateFrequency(CustomClusterTestSuite): + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--topic_update_tbl_max_wait_time_ms=500") + def test_topic_updates_unblock(self): + """Test to simulate query blocking conditions as per IMPALA-6671 + and makes sure that unrelated queries are not blocked by other long running + queries which block topic updates.""" + # queries that we don't expect to block when a slow running blocking query is + # running in parallel. We want these queries to request the metadata from catalogd + # and hence the init queries invalidate the metadata before each test case run below. + non_blocking_queries = [ + # each of these take about 2-4 seconds when there is no lock contention. + "describe functional.emptytable", + "select * from functional.tinytable limit 1", + "show partitions functional.alltypessmall", + ] + # queries used to reset the metadata of the non_blocking_queries so that they will + # reload the next time they are executed + init_queries = [ + "invalidate metadata functional.emptytable", + "invalidate metadata functional.tinytable", + "invalidate metadata functional.alltypessmall", + ] + # make sure that the blocking query metadata is loaded in catalogd since table lock + # is only acquired on loaded tables. + self.client.execute("refresh tpcds.store_sales") + self.client.execute("refresh functional.alltypes") + # add the debug actions so that blocking queries take long time complete while + # holding the table lock. These debug actions are tuned such that each of the blocking + # queries below take little more than 10 seconds (2x slower than fast queries). + debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000" + blocking_query_options = { + "debug_action": debug_action, + "sync_ddl": "false" + } + blocking_queries = [ + "refresh tpcds.store_sales", + "alter table tpcds.store_sales recover partitions", + "compute stats functional.alltypes" + ] + + for blocking_query in blocking_queries: + print("Running blocking query: {0}".format(blocking_query)) + # blocking query is without sync_ddl + blocking_query_options["sync_ddl"] = "false" + self.__run_topic_update_test(blocking_query, + non_blocking_queries, init_queries, blocking_query_options=blocking_query_options) + # blocking query is with sync_ddl + blocking_query_options["sync_ddl"] = "true" + self.__run_topic_update_test(blocking_query, + non_blocking_queries, init_queries, blocking_query_options=blocking_query_options) + non_blocking_query_options = { + "sync_ddl": "true", + } + self.__run_topic_update_test(blocking_query, + non_blocking_queries, init_queries, blocking_query_options=blocking_query_options, + non_blocking_query_options=non_blocking_query_options) + blocking_query_options["sync_ddl"] = "false" + self.__run_topic_update_test(blocking_query, + non_blocking_queries, init_queries, blocking_query_options=blocking_query_options, + non_blocking_query_options=non_blocking_query_options) + + def __run_topic_update_test(self, slow_blocking_query, fast_queries, + init_queries, blocking_query_options, + non_blocking_query_options=None, blocking_query_min_time=10000, + fast_query_timeout_ms=6000, non_blocking_impalad=0, + expect_topic_updates_to_block=False): + """This function runs the slow query in a Impala client and then creates separate + Impala clients to run the fast_queries. It makes sure that the + fast_queries don't get blocked by the slow query by making sure that + fast_queries return back before the blocking query within a given expected + timeout.""" + assert fast_query_timeout_ms < blocking_query_min_time + # run the init queries first in sync_ddl mode so that all the impalads are starting + # with a clean state. + for q in init_queries: + self.execute_query(q) + + pool = ThreadPool(processes=len(fast_queries)) + slow_query_pool = ThreadPool(processes=len(slow_blocking_query)) + # run the slow query on the impalad-0 with the given query options + slow_query_future = slow_query_pool.apply_async(self.exec_and_time, + args=(slow_blocking_query, blocking_query_options, 0)) + # there is no other good way other than to wait for some time to make sure that + # the slow query has been submitted and being compiled before we start the + # non_blocking queries + time.sleep(1) + fast_query_futures = {} + for fast_query in fast_queries: + # run other queries on second impalad + fast_query_futures[fast_query] = pool.apply_async(self.exec_and_time, + args=(fast_query, non_blocking_query_options, non_blocking_impalad)) + + for fast_query in fast_query_futures: + if not expect_topic_updates_to_block: + assert fast_query_futures[ + fast_query].get() < fast_query_timeout_ms, \ + "{0} did not complete within {1} msec".format(fast_query, fast_query_timeout_ms) + else: + # topic updates are expected to block and hence all the other queries should run + # only after blocking query finishes. + fast_query_futures[ + fast_query].get() > blocking_query_min_time, \ + "{0} did not complete within {1} msec".format(fast_query, fast_query_timeout_ms) + # make sure that the slow query exceeds the given timeout; otherwise the test + # doesn't make much sense. + assert slow_query_future.get() > blocking_query_min_time, \ + "{0} query took less time than {1} msec".format(slow_blocking_query, + blocking_query_min_time) + # we wait for some time here to make sure that the topic updates from the last + # query have been propagated so that next run of this method starts from a clean + # state. + time.sleep(2) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--topic_update_tbl_max_wait_time_ms=500") + def test_topic_updates_advance(self): + """Test make sure that a if long running blocking queries are run continuously + topic-update thread is not starved and it eventually blocks until it acquires a table + lock.""" + # Each of these queries take complete about 30s with the debug action delays + # below. + blocking_queries = [ + "refresh tpcds.store_sales", + "alter table tpcds.store_sales recover partitions", + "compute stats functional.alltypes" + ] + debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000" + # loop in sync_ddl mode so that we know the topic updates are being propagated. + blocking_query_options = { + "debug_action": debug_action, + "sync_ddl": "true" + } + self.__run_loop_test(blocking_queries, blocking_query_options, 60000) + + def __run_loop_test(self, blocking_queries, blocking_query_options, timeout): + """Runs the given list of queries with given query options in a loop + and makes sure that they complete without any errors.""" + slow_query_pool = ThreadPool(processes=len(blocking_queries)) + # run the slow query on the impalad-0 with the given query options + slow_query_futures = {} + for q in blocking_queries: + print("Running blocking query {0}".format(q)) + slow_query_futures[q] = slow_query_pool.apply_async(self.loop_exec, + args=(q, blocking_query_options)) + + for q in slow_query_futures: + # make sure that queries complete eventually. + durations = slow_query_futures[q].get() + for i in range(len(durations)): + assert durations[i] < timeout, "Query {0} iteration {1} did " \ + "not complete within {2}.".format(q, i, timeout) + + def loop_exec(self, query, query_options, iterations=3, impalad=0): + durations = [] + for iter in range(iterations): + durations.append(self.exec_and_time(query, query_options, impalad)) + return durations + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--topic_update_tbl_max_wait_time_ms=0") + def test_topic_lock_timeout_disabled(self): + """Test makes sure that the topic update thread blocks until tables are + added to each topic update when topic_update_tbl_max_wait_time_ms is set to 0""" + # queries that we don't expect to block when a slow running blocking query is + # running in parallel. We want these queries to request the metadata from catalogd + # and hence the init queries invalidate the metadata before each test case run below. + non_blocking_queries = [ + # each of these take about 2-4 seconds when there is no lock contention. + "describe functional.emptytable" + ] + # queries used to reset the metadata of the non_blocking_queries so that they will + # reload the next time they are executed + init_queries = [ + "invalidate metadata functional.emptytable", + ] + # make sure that the blocking query metadata is loaded in catalogd since table lock + # is only acquired on loaded tables. + blocking_query = "refresh tpcds.store_sales" + debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30" + self.client.execute(blocking_query) + blocking_query_options = { + "debug_action": debug_action, + "sync_ddl": "false" + } + self.__run_topic_update_test(blocking_query, + non_blocking_queries, init_queries, blocking_query_options=blocking_query_options, + expect_topic_updates_to_block=True)
