This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2fccd82590d747d834b8be6f3b05bb446d9bac12
Author: Vihang Karajgaonkar <[email protected]>
AuthorDate: Mon Oct 12 10:38:23 2020 -0700

    IMPALA-6671: Skip locked tables from topic updates
    
    This change adds a mechanism for topic-update thread
    to skip a table which is locked for more than a
    configurable interval from the topic updates. This is
    especially useful in scenarios where long running operations on a
    locked table (refresh, recover partitions, compute stats) block the
    topic update thread. This causes unrelated queries which are
    waiting on metadata via topic updates (catalog-v1 mode)
    to unnecessarily block.
    
    The ideal solution of this problem would be to make HdfsTable
    immutable so that there is no need for table lock. But that
    is large change and not easily portable to older releases
    of Impala. It would be taken up as a separate patch.
    
    This change introduces 2 new configurations for catalogd:
    
    1. topic_update_tbl_max_wait_time_ms: This defines the
    maximum time in msecs the topic update thread waits on a locked table
    before skipping the table from that iteration of topic updates.
    The default value is 500. If this configuration is set to 0
    the lock with timeout for topic update thread is disabled.
    2. catalog_max_lock_skipped_topic_updates: This defines
    the maximum number of distinct lock operations which are skipped
    by topic update thread due to lock contention. Once this limit
    is reached, topic update thread will block until it acquires
    the table lock and adds it to the updates.
    
    Testing:
    1. Added a test case which introduces a simulated delay
    in a few potentially long running statements. This causes the table
    to be locked for a long time. The topic update thread skips
    that table from updates and unrelated queries are unblocked
    since they receive the required metadata from updates.
    2. Added a test where multiple threads run blocking statements
    in a loop to stress the table lock. It makes sure that topic
    update thread is not starved and eventually blocks
    on table lock by hitting the limit defined by
    catalog_max_lock_skipped_topic_updates.
    3. Ran exhaustive tests with default configurations.
    
    Change-Id: Ic657b96edbcdc94c6b906e7ca59291f4e4715655
    Reviewed-on: http://gerrit.cloudera.org:8080/16549
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  15 +
 be/src/util/backend-gflag-util.cc                  |   5 +
 common/thrift/BackendGflags.thrift                 |   4 +
 .../apache/impala/analysis/CopyTestCaseStmt.java   |   4 +-
 .../java/org/apache/impala/catalog/Catalog.java    |   4 +-
 .../impala/catalog/CatalogServiceCatalog.java      | 315 ++++++++++++++++-----
 .../org/apache/impala/catalog/HdfsPartition.java   |   6 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  74 +++++
 .../main/java/org/apache/impala/catalog/Table.java |  72 ++++-
 .../org/apache/impala/catalog/TopicUpdateLog.java  |  18 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 109 ++++---
 .../catalog/CatalogObjectToFromThriftTest.java     |   4 +-
 .../custom_cluster/test_topic_update_frequency.py  | 213 ++++++++++++++
 13 files changed, 696 insertions(+), 147 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index a66e256..e14c982 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -74,6 +74,21 @@ 
DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
     "(in seconds) a partial catalog object fetch RPC spends in the queue 
waiting "
     "to run. Must be set to a value greater than zero.");
 
+DEFINE_int32(catalog_max_lock_skipped_topic_updates, 2, "Maximum number of 
topic "
+    "updates skipped for a table due to lock contention in catalogd after 
which it must"
+    "be added to the topic the update log. This limit only applies to distinct 
lock "
+    "operations which block the topic update thread.");
+
+DEFINE_int64(topic_update_tbl_max_wait_time_ms, 500, "Maximum time "
+     "(in milliseconds) catalog's topic update thread will wait to acquire 
lock on "
+     "table. If the topic update thread cannot acquire a table lock it skips 
the table "
+     "from that topic update and processes the table in the next update. 
However to "
+     "prevent starvation it only skips the table 
catalog_max_lock_skipped_topic_updates "
+     "many times. After that limit is hit, topic thread block until it 
acquires the "
+     "table lock. A value of 0 disables the timeout based locking which means 
topic "
+     "update thread will always block until table lock is acquired.");
+
+
 DECLARE_string(debug_actions);
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index b43c63e..754e9f2 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -85,6 +85,8 @@ DECLARE_bool(use_customized_user_groups_mapper_for_ranger);
 DECLARE_bool(enable_column_masking);
 DECLARE_bool(compact_catalog_topic);
 DECLARE_bool(enable_incremental_metadata_updates);
+DECLARE_int64(topic_update_tbl_max_wait_time_ms);
+DECLARE_int32(catalog_max_lock_skipped_topic_updates);
 
 namespace impala {
 
@@ -173,6 +175,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   cfg.__set_compact_catalog_topic(FLAGS_compact_catalog_topic);
   cfg.__set_enable_incremental_metadata_updates(
       FLAGS_enable_incremental_metadata_updates);
+  
cfg.__set_topic_update_tbl_max_wait_time_ms(FLAGS_topic_update_tbl_max_wait_time_ms);
+  cfg.__set_catalog_max_lock_skipped_topic_updates(
+      FLAGS_catalog_max_lock_skipped_topic_updates);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 89ce733..bd9d4b9 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -161,4 +161,8 @@ struct TBackendGflags {
   68: required bool compact_catalog_topic
 
   69: required bool enable_incremental_metadata_updates
+
+  70: required i64 topic_update_tbl_max_wait_time_ms
+
+  71: required i32 catalog_max_lock_skipped_topic_updates
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
index 69adb81..f6aba15 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
@@ -173,11 +173,11 @@ public class CopyTestCaseStmt extends StatementBase {
     }
     for (FeTable table: referencedTbls) {
       Preconditions.checkState(table instanceof FeTable);
-      ((Table) table).getLock().lock();
+      ((Table) table).takeReadLock();
       try {
         result.addToTables_and_views(((Table) table).toThrift());
       } finally {
-        ((Table) table).getLock().unlock();
+        ((Table) table).releaseReadLock();
       }
     }
     return result;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 1294d82..d232f0a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -505,13 +505,13 @@ public abstract class Catalog implements AutoCloseable {
           throw new CatalogException("Table not found: " +
               objectDesc.getTable().getTbl_name());
         }
-        table.getLock().lock();
+        table.takeReadLock();
         try {
           result.setType(table.getCatalogObjectType());
           result.setCatalog_version(table.getCatalogVersion());
           result.setTable(table.toThrift());
         } finally {
-          table.getLock().unlock();
+          table.releaseReadLock();
         }
         break;
       }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 23473e6..704bac4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -206,8 +207,10 @@ public class CatalogServiceCatalog extends Catalog {
 
   private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
   private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
-  // Timeout for acquiring a table lock
-  // TODO: Make this configurable
+  private final int maxSkippedUpdatesLockContention_;
+  //value of timeout for the topic update thread while waiting on the table 
lock.
+  private final long topicUpdateTblLockMaxWaitTimeMs_;
+  //Default value of timeout for acquiring a table lock.
   private static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
   // Time to sleep before retrying to acquire a table lock
   private static final int LOCK_RETRY_DELAY_MS = 10;
@@ -315,6 +318,14 @@ public class CatalogServiceCatalog extends Catalog {
         BackendConfig.INSTANCE.getBlacklistedDbs(), LOG);
     blacklistedTables_ = CatalogBlacklistUtils.parseBlacklistedTables(
         BackendConfig.INSTANCE.getBlacklistedTables(), LOG);
+    maxSkippedUpdatesLockContention_ = BackendConfig.INSTANCE
+        .getBackendCfg().catalog_max_lock_skipped_topic_updates;
+    Preconditions.checkState(maxSkippedUpdatesLockContention_ > 0,
+        "catalog_max_lock_skipped_topic_updates must be positive");
+    topicUpdateTblLockMaxWaitTimeMs_ = BackendConfig.INSTANCE
+        .getBackendCfg().topic_update_tbl_max_wait_time_ms;
+    Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0,
+        "topic_update_tbl_max_wait_time_ms must be positive");
     catalogServiceId_ = catalogServiceId;
     tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
     loadInBackground_ = loadInBackground;
@@ -426,30 +437,45 @@ public class CatalogServiceCatalog extends Catalog {
    * held when the function returns. Returns false otherwise and no lock is 
held in
    * this case.
    */
-  public boolean tryLockTable(Table tbl) {
+  public boolean tryWriteLock(Table tbl) {
+    return tryLock(tbl, true, LOCK_RETRY_TIMEOUT_MS);
+  }
+
+  /**
+   * Tries to acquire the table similar to described in
+   * {@link CatalogServiceCatalog#tryWriteLock(Table)} but with a custom 
timeout.
+   */
+  public boolean tryLock(Table tbl, final boolean useWriteLock, final long 
timeout) {
+    Preconditions.checkArgument(timeout > 0);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
-        "Attempting to lock table " + tbl.getFullName())) {
+        String.format("Attempting to %s lock table %s with a timeout of %s ms",
+            (useWriteLock ? "write" : "read"), tbl.getFullName(), timeout))) {
       long begin = System.currentTimeMillis();
       long end;
       do {
         versionLock_.writeLock().lock();
-        if (tbl.getLock().tryLock()) {
-          if (LOG.isTraceEnabled()) {
-            end = System.currentTimeMillis();
-            LOG.trace(String.format("Lock for table %s was acquired in %d 
msec",
-                tbl.getFullName(), end - begin));
-          }
-          return true;
-        }
-        versionLock_.writeLock().unlock();
+        Lock lock = useWriteLock ? tbl.writeLock() : tbl.readLock();
         try {
+          //Note that we don't use the timeout directly here since the timeout
+          //since we don't want to unnecessarily hold the versionLock if the 
table
+          //cannot be acquired. Holding version lock can potentially blocks 
other
+          //unrelated DDL operations.
+          if (lock.tryLock(0, TimeUnit.SECONDS)) {
+            if (LOG.isTraceEnabled()) {
+              end = System.currentTimeMillis();
+              LOG.trace(String.format("Lock for table %s was acquired in %d 
msec",
+                  tbl.getFullName(), end - begin));
+            }
+            return true;
+          }
+          versionLock_.writeLock().unlock();
           // Sleep to avoid spinning and allow other operations to make 
progress.
           Thread.sleep(LOCK_RETRY_DELAY_MS);
         } catch (InterruptedException e) {
           // ignore
         }
         end = System.currentTimeMillis();
-      } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
+      } while (end - begin < timeout);
       return false;
     }
   }
@@ -638,7 +664,7 @@ public class CatalogServiceCatalog extends Catalog {
 
     Map<String, ByteBuffer> stats = new HashMap<>();
     HdfsTable hdfsTable = (HdfsTable) table;
-    hdfsTable.getLock().lock();
+    hdfsTable.takeReadLock();
     try {
       Collection<? extends PrunablePartition> partitions = 
hdfsTable.getPartitions();
       for (PrunablePartition partition : partitions) {
@@ -652,7 +678,7 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
     } finally {
-      hdfsTable.getLock().unlock();
+      hdfsTable.releaseReadLock();
     }
     LOG.info("Fetched partition statistics for " + stats.size()
         + " partitions on: " + hdfsTable.getFullName());
@@ -694,7 +720,7 @@ public class CatalogServiceCatalog extends Catalog {
       String key = Catalog.toCatalogObjectKey(obj);
       if (obj.type != TCatalogObjectType.CATALOG) {
         topicUpdateLog_.add(key,
-            new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion));
+            new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion, 
0));
         if (!delete) updatedCatalogObjects.add(key);
       }
       // TODO: TSerializer.serialize() returns a copy of the internal byte 
array, which
@@ -983,7 +1009,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     // we should acquire the table lock so that we wait for any other updates
     // happening to this table at the same time
-    if (!tryLockTable(tbl)) {
+    if (!tryWriteLock(tbl)) {
       throw new CatalogException(String.format("Error during self-event 
evaluation "
           + "for table %s due to lock contention", tbl.getFullName()));
     }
@@ -1024,7 +1050,7 @@ public class CatalogServiceCatalog extends Catalog {
         return failingPartitions.isEmpty();
       }
     } finally {
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
     return false;
   }
@@ -1279,24 +1305,162 @@ public class CatalogServiceCatalog extends Catalog {
    */
   private void addTableToCatalogDelta(Table tbl, GetCatalogDeltaContext ctx)
       throws TException  {
-    if (tbl.getCatalogVersion() <= ctx.toVersion) {
-      addTableToCatalogDeltaHelper(tbl, ctx);
+    TopicUpdateLog.Entry topicUpdateEntry =
+        topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+    Preconditions.checkNotNull(topicUpdateEntry);
+    // it is important to get the table version once and then use it for 
following logic
+    // since we don't have the table lock yet and the table could be changed 
during the
+    // execution below.
+    final long tblVersion = tbl.getCatalogVersion();
+    if (tblVersion <= ctx.toVersion) {
+      // if we have already skipped this table due to lock contention
+      // maxSkippedUpdatesLockContention number of times, we must block until 
we add it
+      // to the topic updates. Otherwise, we can attempt to take a lock with a 
timeout.
+      // if the topicUpdateTblLockMaxWaitTimeMs is set to 0, it means the lock 
timeout
+      // is disabled and we should just block here until lock is acquired.
+      boolean lockWithTimeout = topicUpdateTblLockMaxWaitTimeMs_ > 0
+          && topicUpdateEntry.getNumSkippedUpdatesLockContention()
+          < maxSkippedUpdatesLockContention_;
+      if (topicUpdateTblLockMaxWaitTimeMs_ > 0 && !lockWithTimeout) {
+        LOG.warn("Topic update thread blocking until lock is acquired for 
table {}",
+            tbl.getFullName());
+      }
+      lockTableAndAddToCatalogDelta(tblVersion, tbl, ctx, lockWithTimeout);
     } else {
-      TopicUpdateLog.Entry topicUpdateEntry =
-          topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
-      Preconditions.checkNotNull(topicUpdateEntry);
+      // tbl is outside the current topic update window. For a fast changing 
table, it is
+      // possible that this tbl is starved and never added to topic updates. 
Hence we
+      // see how many times the table was skipped before.
       if (topicUpdateEntry.getNumSkippedTopicUpdates() == 
MAX_NUM_SKIPPED_TOPIC_UPDATES) {
-        addTableToCatalogDeltaHelper(tbl, ctx);
+        LOG.warn("Topic update thread blocking until lock is acquired for 
table {} "
+                + "since the table was already skipped {} number of times",
+            tbl.getFullName(), MAX_NUM_SKIPPED_TOPIC_UPDATES);
+        lockTableAndAddToCatalogDelta(tblVersion, tbl, ctx, false);
       } else {
         LOG.info("Table {} (version={}) is skipping topic update ({}, {}]",
-            tbl.getFullName(), tbl.getCatalogVersion(), ctx.fromVersion, 
ctx.toVersion);
+            tbl.getFullName(), tblVersion, ctx.fromVersion, ctx.toVersion);
         topicUpdateLog_.add(tbl.getUniqueName(),
             new TopicUpdateLog.Entry(
                 topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
                 topicUpdateEntry.getLastSentVersion(),
-                topicUpdateEntry.getLastSentCatalogUpdate()));
+                topicUpdateEntry.getLastSentCatalogUpdate(),
+                topicUpdateEntry.getNumSkippedUpdatesLockContention()));
+      }
+    }
+  }
+
+  /**
+   * This method takes a lock on the table and adds it to the
+   * {@link GetCatalogDeltaContext} which is eventually sent via the topic 
updates. A lock
+   * on table essentially blocks other concurrent catalog operations on the 
table. Also,
+   * if the table is a {@link HdfsTable} and it is already locked, this method 
may or may
+   * not block until the table lock is acquired depending on whether 
lockWithTimeout
+   * parameter is false or not.
+   * When the lockWithTimeout is true this method attempts to acquire a lock 
with a
+   * timeout specified by {@code topicUpdateTblLockMaxWaitTimeMs}. If the lock 
is acquired
+   * within the timeout, it continues ahead and adds the table to the ctx. 
However, if
+   * the lock is not acquired within the timeout, it skips the table and 
increments
+   * the counter in {@link TopicUpdateLog.Entry}.
+   * @param tblVersion The table version at the time when topic update 
evaluates if the
+   *                   table needs to be in this update or not.
+   * @param tbl The table object which needs to added to the topic update.
+   * @param ctx GetCatalogDeltaContext where the table is add if the lock is 
acquired.
+   * @param lockWithTimeout If this is true, the method attempts to lock with 
a timeout
+   *                        else, it blocks until lock is acquired.
+   */
+  private void lockTableAndAddToCatalogDelta(final long tblVersion, Table tbl,
+      GetCatalogDeltaContext ctx, boolean lockWithTimeout) throws TException {
+    Stopwatch sw = Stopwatch.createStarted();
+    if (tbl instanceof HdfsTable && lockWithTimeout) {
+      if (!lockHdfsTblWithTimeout(tblVersion, (HdfsTable) tbl, ctx)) return;
+    } else {
+      // this is not HdfsTable or lockWithTimeout is false.
+      // We block until table read lock is acquired.
+      tbl.takeReadLock();
+    }
+    long elapsedTime = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+    if (elapsedTime > 2000) {
+      LOG.debug("Time taken to acquire read lock on table {} for topic update 
{} ms",
+          tbl.getFullName(), elapsedTime);
+    }
+    try {
+      addTableToCatalogDeltaHelper(tbl, ctx);
+    } finally {
+      tbl.releaseReadLock();
+    }
+  }
+
+  /**
+   * Attempts to take a read lock on the give HdfsTable within a configurable 
timeout
+   * of {@code topicUpdateTblLockMaxWaitTimeMs}.
+   * @param tblVersion The version of the table when topic update thread 
inspects the
+   *                   table to be added to the catalog topic updates.
+   * @param hdfsTable The table to be read locked.
+   * @param ctx The current {@link GetCatalogDeltaContext} for this topic 
update.
+   * @return true if the table was successfully read-locked, false otherwise.
+   */
+  private boolean lockHdfsTblWithTimeout(long tblVersion, HdfsTable hdfsTable,
+      GetCatalogDeltaContext ctx) {
+    // see the comment below on why we need 2 attempts.
+    final int maxAttempts = 2;
+    int attemptCount = 0;
+    boolean lockAcquired;
+    do {
+      attemptCount++;
+      // topicUpdateTblLockMaxWaitTimeMs indicates the total amount of time we 
are willing
+      // to wait to acquire the table lock. We make 2 attempts and hence the 
timeout here
+      // is topicUpdateTblLockMaxWaitTimeMs/2 so that overall the method waits 
for
+      // maximum of topicUpdateTblLockMaxWaitTimeMs for the lock.
+      long timeoutMs = topicUpdateTblLockMaxWaitTimeMs_ /maxAttempts;
+      lockAcquired = tryLock(hdfsTable, false, timeoutMs);
+      if (lockAcquired) {
+        // table lock was successfully acquired. We can now release the 
versionLock.
+        versionLock_.writeLock().unlock();
+        return true;
       }
+      // If we reach here, the topic update thread could not take a lock on 
this table.
+      // We should bump up the pending table version so that this gets 
included in the
+      // next topic update. However, there is a race condition which needs to 
be handled
+      // here. If we update the pending version here
+      // after hdfsTable.setCatalogVersion() is called from CatalogOpExecutor, 
the bump up
+      // of pendingVersion takes no effect. Hence we detect such case by 
comparing the
+      // tbl version with the expected tblVersion. if the tblVersion does not 
match, it
+      // means that the setCatalogVersion has already been called and there is 
no point
+      // in bumping up the pending version now. We retry to take a lock on the 
tbl again
+      // in such case. If we cannot get a read lock in attempt 2, it means 
that we were
+      // unlucky again and some other write operation has acquired the tbl 
lock. In such
+      // a case it is guaranteed that the tbl version will be updated outside 
current
+      // window of the topic updates and it is safe to skip the table.
+      boolean pendingVersionUpdated = hdfsTable
+          .updatePendingVersion(tblVersion, incrementAndGetCatalogVersion());
+      if (pendingVersionUpdated) break;
+      // if pendingVersionUpdated is false it means that tblVersion has been 
changed
+      // and hence we didn't update the pendingVersion. We retry once to 
acquire a read
+      // lock.
+    } while (attemptCount != maxAttempts);
+    // lock could not be acquired, we update the skip count in the topicUpdate 
entry
+    // if applicable.
+    TopicUpdateLog.Entry topicUpdateEntry = topicUpdateLog_
+        .getOrCreateLogEntry(hdfsTable.getUniqueName());
+    LOG.info(
+        "Table {} (version={}, lastSeen={}) is skipping topic update ({}, {}] "
+            + "due to lock contention", hdfsTable.getFullName(), tblVersion,
+        hdfsTable.getLastVersionSeenByTopicUpdate(), ctx.fromVersion, 
ctx.toVersion);
+    if (hdfsTable.getLastVersionSeenByTopicUpdate() != tblVersion) {
+      // if the last version skipped by topic update is not same as the last 
version
+      // sent, it means the table was updated and topic update thread is 
lagging
+      // behind.
+      topicUpdateLog_.add(hdfsTable.getUniqueName(),
+          new TopicUpdateLog.Entry(
+              topicUpdateEntry.getNumSkippedTopicUpdates(),
+              topicUpdateEntry.getLastSentVersion(),
+              topicUpdateEntry.getLastSentCatalogUpdate(),
+              topicUpdateEntry.getNumSkippedUpdatesLockContention() + 1));
+      // we keep track of the table version when topic update thread had to 
skip the
+      // table from updates so that next iteration can determine if we need to
+      // increment the lock contention counter in topic update entry again.
+      hdfsTable.setLastVersionSeenByTopicUpdate(tblVersion);
     }
+    return false;
   }
 
   /**
@@ -1306,44 +1470,47 @@ public class CatalogServiceCatalog extends Catalog {
    */
   private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext 
ctx)
       throws TException {
+    Preconditions.checkState(tbl.isReadLockedByCurrentThread(),
+        "Topic update thread does not hold a lock on table " + 
tbl.getFullName()
+            + " while generating catalog delta");
     TCatalogObject catalogTbl =
         new TCatalogObject(TABLE, Catalog.INITIAL_CATALOG_VERSION);
-    tbl.getLock().lock();
+    long tblVersion = tbl.getCatalogVersion();
+    if (tblVersion <= ctx.fromVersion) {
+      LOG.trace("Table {} version {} skipping the update ({}, {}]",
+          tbl.getFullName(), tbl.getCatalogVersion(), ctx.fromVersion, 
ctx.toVersion);
+      return;
+    }
+    String tableUniqueName = tbl.getUniqueName();
+    TopicUpdateLog.Entry topicUpdateEntry =
+        topicUpdateLog_.getOrCreateLogEntry(tableUniqueName);
+    if (tblVersion > ctx.toVersion &&
+        topicUpdateEntry.getNumSkippedTopicUpdates() < 
MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+      LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+          ctx.toVersion);
+      topicUpdateLog_.add(tableUniqueName,
+          new TopicUpdateLog.Entry(
+              topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+              topicUpdateEntry.getLastSentVersion(),
+              topicUpdateEntry.getLastSentCatalogUpdate(),
+              topicUpdateEntry.getNumSkippedUpdatesLockContention()));
+      return;
+    }
     try {
-      long tblVersion = tbl.getCatalogVersion();
-      if (tblVersion <= ctx.fromVersion) return;
-      String tableUniqueName = tbl.getUniqueName();
-      TopicUpdateLog.Entry topicUpdateEntry =
-          topicUpdateLog_.getOrCreateLogEntry(tableUniqueName);
-      if (tblVersion > ctx.toVersion &&
-          topicUpdateEntry.getNumSkippedTopicUpdates() < 
MAX_NUM_SKIPPED_TOPIC_UPDATES) {
-        LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
-            ctx.toVersion);
-        topicUpdateLog_.add(tableUniqueName,
-            new TopicUpdateLog.Entry(
-                topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
-                topicUpdateEntry.getLastSentVersion(),
-                topicUpdateEntry.getLastSentCatalogUpdate()));
-        return;
-      }
-      try {
-        if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
-            && tbl instanceof HdfsTable) {
-          catalogTbl.setTable(((HdfsTable) 
tbl).toThriftWithMinimalPartitions());
-          addHdfsPartitionsToCatalogDelta((HdfsTable) tbl, ctx);
-        } else {
-          catalogTbl.setTable(tbl.toThrift());
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Error calling toThrift() on table %s: %s",
-            tbl.getFullName(), e.getMessage()), e);
-        return;
+      if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
+          && tbl instanceof HdfsTable) {
+        catalogTbl.setTable(((HdfsTable) tbl).toThriftWithMinimalPartitions());
+        addHdfsPartitionsToCatalogDelta((HdfsTable) tbl, ctx);
+      } else {
+        catalogTbl.setTable(tbl.toThrift());
       }
-      catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-      ctx.addCatalogObject(catalogTbl, false);
-    } finally {
-      tbl.getLock().unlock();
+    } catch (Exception e) {
+      LOG.error(String.format("Error calling toThrift() on table %s: %s",
+          tbl.getFullName(), e.getMessage()), e);
+      return;
     }
+    catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+    ctx.addCatalogObject(catalogTbl, false);
   }
 
   private void addHdfsPartitionsToCatalogDelta(HdfsTable hdfsTable,
@@ -2064,6 +2231,11 @@ public class CatalogServiceCatalog extends Catalog {
         deleteLog_.addRemovedObject(existingTbl.toMinimalTCatalogObject());
       }
       updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
+      // note that we update the db with a new instance of table. In such case
+      // we may lose some temporary state stored in the old table like 
pendingVersion
+      // This is okay since the table version is bumped up to the next topic 
update window
+      // and eventually the table will need to added to the topic update if 
hits the
+      // MAX_NUM_SKIPPED_TOPIC_UPDATES limit.
       db.addTable(updatedTbl);
       return updatedTbl;
     } finally {
@@ -2297,7 +2469,7 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
     String tblName = tbl.getName();
-    if (!tryLockTable(tbl)) {
+    if (!tryWriteLock(tbl)) {
       throw new CatalogException(String.format("Error refreshing metadata for 
table " +
           "%s due to lock contention", tbl.getFullName()));
     }
@@ -2328,7 +2500,7 @@ public class CatalogServiceCatalog extends Catalog {
     } finally {
       context.stop();
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -2340,7 +2512,7 @@ public class CatalogServiceCatalog extends Catalog {
       throws CatalogException {
     Preconditions.checkNotNull(tbl);
     Preconditions.checkNotNull(partitionSet);
-    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
Hdfs table");
     }
@@ -2415,11 +2587,11 @@ public class CatalogServiceCatalog extends Catalog {
         Table result = removeTable(dbName, tblName);
         if (result == null) return null;
         tblWasRemoved.setRef(true);
-        result.getLock().lock();
+        result.takeReadLock();
         try {
           return result.toTCatalogObject();
         } finally {
-          result.getLock().unlock();
+          result.releaseReadLock();
         }
       }
 
@@ -2844,7 +3016,7 @@ public class CatalogServiceCatalog extends Catalog {
   public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> 
partitionSpec,
       Reference<Boolean> wasPartitionReloaded, CatalogObject.ThriftObjectType 
resultType,
       String reason) throws CatalogException {
-    if (!tryLockTable(tbl)) {
+    if (!tryWriteLock(tbl)) {
       throw new CatalogException(String.format("Error reloading partition of 
table %s " +
           "due to lock contention", tbl.getFullName()));
     }
@@ -2894,7 +3066,7 @@ public class CatalogServiceCatalog extends Catalog {
       return hdfsTable.toTCatalogObject(resultType);
     } finally {
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -3111,7 +3283,7 @@ public class CatalogServiceCatalog extends Catalog {
           ". Table not yet loaded.";
       return result;
     }
-    if (!tbl.getLock().tryLock()) {
+    if (!tbl.tryReadLock()) {
       result = "Metrics for table " + dbName + "." + tblName + "are not 
available " +
           "because the table is currently modified by another operation.";
       return result;
@@ -3119,7 +3291,7 @@ public class CatalogServiceCatalog extends Catalog {
     try {
       return tbl.getMetrics().toString();
     } finally {
-      tbl.getLock().unlock();
+      tbl.releaseReadLock();
     }
   }
 
@@ -3235,8 +3407,7 @@ public class CatalogServiceCatalog extends Catalog {
       }
       Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos;
       TGetPartialCatalogObjectResponse resp;
-      // TODO(todd): consider a read-write lock here.
-      table.getLock().lock();
+      table.takeReadLock();
       try {
         if (table instanceof HdfsTable || table instanceof IcebergTable) {
           HdfsTable hdfsTable = table instanceof HdfsTable ? (HdfsTable) table 
:
@@ -3252,7 +3423,7 @@ public class CatalogServiceCatalog extends Catalog {
           return table.getPartialInfo(req);
         }
       } finally {
-        table.getLock().unlock();
+        table.releaseReadLock();
       }
     }
     case FUNCTION: {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 50fbde7..3d3b0c3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -841,7 +841,7 @@ public class HdfsPartition extends CatalogObjectImpl
    */
   public boolean removeFromVersionsForInflightEvents(
       boolean isInsertEvent, long versionNumber) {
-    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+    Preconditions.checkState(table_.isWriteLockedByCurrentThread(),
         "removeFromVersionsForInflightEvents called without holding the table 
lock on "
             + "partition " + getPartitionName() + " of table " + 
table_.getFullName());
     return inFlightEvents_.remove(isInsertEvent, versionNumber);
@@ -855,7 +855,7 @@ public class HdfsPartition extends CatalogObjectImpl
    *                      when isInsertEvent is false, it's version number to 
add
    */
   public void addToVersionsForInflightEvents(boolean isInsertEvent, long 
versionNumber) {
-    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+    Preconditions.checkState(table_.isWriteLockedByCurrentThread(),
         "addToVersionsForInflightEvents called without holding the table lock 
on "
             + "partition " + getPartitionName() + " of table " + 
table_.getFullName());
     if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
@@ -1452,7 +1452,7 @@ public class HdfsPartition extends CatalogObjectImpl
      */
     public void addToVersionsForInflightEvents(boolean isInsertEvent,
         long versionNumber) {
-      Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+      Preconditions.checkState(table_.isWriteLockedByCurrentThread(),
           "addToVersionsForInflightEvents called without holding the table 
lock on "
               + "partition " + getPartitionName() + " of table " + 
table_.getFullName());
       if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 77825cc..05520e2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -295,6 +295,21 @@ public class HdfsTable extends Table implements FeFsTable {
   // in coordinator's cache if there are no updates on them.
   private final Set<HdfsPartition> droppedPartitions_ = new HashSet<>();
 
+  // pendingVersionNumber indicates a version number allocated to this 
HdfsTable for a
+  // ongoing DDL operation. This is mainly used by the topic update thread to 
skip a
+  // table from the topic updates if it cannot acquire lock on this table. The 
topic
+  // update thread bumps up this to a higher value outside the topic window so 
that
+  // the table is considered in the next update. The setCatalogVersion() makes 
use of this
+  // to eventually assign the catalogVersion to the table.
+  private long pendingVersionNumber_ = -1;
+  // lock protecting access to pendingVersionNumber
+  private final Object pendingVersionLock_ = new Object();
+
+  // this field is used to keep track of the last table version which is seen 
by the
+  // topic update thread. It is primarily used to identify distinct locking 
operations
+  // to determine if we can skip the table from the topic update.
+  private long lastVersionSeenByTopicUpdate_ = -1;
+
   // Represents a set of storage-related statistics aggregated at the table or 
partition
   // level.
   public final static class FileMetadataStats {
@@ -2647,4 +2662,63 @@ public class HdfsTable extends Table implements 
FeFsTable {
   public ValidWriteIdList getValidWriteIds() {
     return validWriteIds_;
   }
+
+  /**
+   * Updates the pending version of this table if the tbl version matches with 
the
+   * expectedTblVersion.
+   * @return true if the pending version was updated. Else, false.
+   */
+  public boolean updatePendingVersion(long expectedTblVersion, long 
newPendingVersion) {
+    synchronized (pendingVersionLock_) {
+      if (expectedTblVersion == getCatalogVersion()) {
+        pendingVersionNumber_ = newPendingVersion;
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Sets the version of this table. This makes sure that if there is a
+   * pendingVersionNumber which is higher than the given version, it uses the
+   * pendingVersionNumber. A pendingVersionNumber which is higher than given 
version
+   * represents that the topic update thread tried to add this table to a 
update but
+   * couldn't. Hence it needs the ongoing update operation (represented by the 
given
+   * version) to use a higher version number so that this table falls within 
the next
+   * topic update window.
+   */
+  @Override
+  public void setCatalogVersion(long version) {
+    synchronized (pendingVersionLock_) {
+      long versionToBeSet = version;
+      if (pendingVersionNumber_ > version) {
+        LOG.trace("Pending table version {} is higher than requested version 
{}",
+            pendingVersionNumber_, version);
+        versionToBeSet = pendingVersionNumber_;
+      }
+      LOG.trace("Setting the hdfs table {} version {}", getFullName(), 
versionToBeSet);
+      super.setCatalogVersion(versionToBeSet);
+    }
+  }
+
+  /**
+   * Returns the last version of this table which was seen by the topic update 
thread
+   * when it could not acquire the table lock. This is used to determine if 
the topic
+   * update thread has skipped this table enough number of times that we 
should now
+   * block the topic updates until we add this table. Note that
+   * this method is not thread-safe and assumes that this only called from the
+   * topic-update thread.
+   */
+  public long getLastVersionSeenByTopicUpdate() {
+    return lastVersionSeenByTopicUpdate_;
+  }
+
+  /**
+   * Sets the version as seen by the topic update thread if it skips the 
table. Note that
+   * this method is not thread-safe and assumes that this only called from the
+   * topic-update thread.
+   */
+  public void setLastVersionSeenByTopicUpdate(long version) {
+    lastVersionSeenByTopicUpdate_ = version;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 3eb7802..910bd49 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -25,8 +25,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -78,8 +79,14 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   protected final String name_;
   protected final String owner_;
   protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
-  // Lock protecting this table
-  private final ReentrantLock tableLock_ = new ReentrantLock();
+  // Lock protecting this table. A read lock must be table when we are 
serializing
+  // the table contents over thrift (e.g when returning the table to clients 
over thrift
+  // or when topic-update thread serializes the table in the topic update)
+  // A write lock must be table when the table is being modified (e.g. DDLs or 
refresh)
+  private final ReentrantReadWriteLock tableLock_ = new ReentrantReadWriteLock(
+      true /*fair ordering*/);
+  private final ReadLock readLock_ = tableLock_.readLock();
+  private final WriteLock writeLock_ = tableLock_.writeLock();
 
   // Number of clustering columns.
   protected int numClusteringCols_;
@@ -203,7 +210,52 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
         
.parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE));
   }
 
-  public ReentrantLock getLock() { return tableLock_; }
+  public void takeReadLock() {
+    readLock_.lock();
+  }
+
+  public ReadLock readLock() { return readLock_; }
+  public WriteLock writeLock() { return writeLock_; }
+
+  public void releaseReadLock() {
+    readLock_.unlock();
+  }
+
+  public boolean isReadLockedByCurrentThread() {
+    return tableLock_.getReadHoldCount() > 0;
+  }
+
+  public boolean tryReadLock() {
+    try {
+      // a tryLock with a 0 timeout honors the fairness for lock acquisition
+      // in case there are other threads waiting to acquire a read lock when
+      // compared to tryLock() which "barges" in and takes a read lock if
+      // available with no consideration to other threads waiting for a read 
lock.
+      return readLock_.tryLock(0, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // ignored
+    }
+    return false;
+  }
+
+  public void releaseWriteLock() {
+    writeLock_.unlock();
+  }
+
+  /**
+   * Returns true if this table is write locked by current thread.
+   */
+  public boolean isWriteLockedByCurrentThread() {
+    return writeLock_.isHeldByCurrentThread();
+  }
+
+  /**
+   * Returns true if this table is write locked by any thread.
+   */
+  public boolean isWriteLocked() {
+    return tableLock_.isWriteLocked();
+  }
+
   @Override
   public abstract TTableDescriptor toThriftDescriptor(
       int tableId, Set<Long> referencedPartitions);
@@ -492,7 +544,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     // the table lock should already be held, and we want the toThrift() to be 
consistent
     // with the modification. So this check helps us identify places where the 
lock
     // acquisition is probably missing entirely.
-    if (!tableLock_.isHeldByCurrentThread()) {
+    if (!isLockedByCurrentThread()) {
       throw new IllegalStateException(
           "Table.toThrift() called without holding the table lock: " +
               getFullName() + " " + getClass().getName());
@@ -524,6 +576,10 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     return table;
   }
 
+  private boolean isLockedByCurrentThread() {
+    return isReadLockedByCurrentThread() || 
tableLock_.isWriteLockedByCurrentThread();
+  }
+
   public TCatalogObject toMinimalTCatalogObject() {
     return toMinimalTCatalogObjectHelper();
   }
@@ -809,7 +865,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
    */
   public boolean removeFromVersionsForInflightEvents(
       boolean isInsertEvent, long versionNumber) {
-    Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
+    Preconditions.checkState(isWriteLockedByCurrentThread(),
         "removeFromVersionsForInFlightEvents called without taking the table 
lock on "
             + getFullName());
     return inFlightEvents.remove(isInsertEvent, versionNumber);
@@ -831,7 +887,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     // we generally don't take locks on Incomplete tables since they are 
atomically
     // replaced during load
     Preconditions.checkState(
-        this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
+        this instanceof IncompleteTable || isWriteLockedByCurrentThread());
     if (!inFlightEvents.add(isInsertEvent, versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the table %s. This 
could "
           + "cause unnecessary refresh of the table when the event is received 
by the "
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java 
b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
index be80b3b..040e0cf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -55,7 +55,8 @@ public class TopicUpdateLog {
   // associated with a catalog object and stores information about the last 
topic update
   // that processed that object.
   public static class Entry {
-    // Number of times the entry has skipped a topic update.
+    // Number of times the entry has skipped a topic update because the table 
version
+    // was out of the requested update version window.
     private final int numSkippedUpdates_;
     // Last version of the corresponding catalog object that was added to a 
topic
     // update. -1 if the object was never added to a topic update.
@@ -63,30 +64,41 @@ public class TopicUpdateLog {
     // Version of the last topic update to include the corresponding catalog 
object.
     // -1 if the object was never added to a topic update.
     private final long lastSentTopicUpdate_;
+    // number of time the topic update skipped this table due to lock 
contention
+    private final int numSkippedUpdatesLockContention_;
 
     Entry() {
       numSkippedUpdates_ = 0;
       lastSentVersion_ = -1;
       lastSentTopicUpdate_ = -1;
+      numSkippedUpdatesLockContention_ = 0;
     }
 
-    Entry(int numSkippedUpdates, long lastSentVersion, long 
lastSentCatalogUpdate) {
+    Entry(int numSkippedUpdates, long lastSentVersion, long 
lastSentCatalogUpdate,
+        int numSkippedUpdatesLockContention) {
       numSkippedUpdates_ = numSkippedUpdates;
       lastSentVersion_ = lastSentVersion;
       lastSentTopicUpdate_ = lastSentCatalogUpdate;
+      numSkippedUpdatesLockContention_ = numSkippedUpdatesLockContention;
     }
 
     public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; }
     public long getLastSentVersion() { return lastSentVersion_; }
     public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; }
 
+    public int getNumSkippedUpdatesLockContention() {
+      return numSkippedUpdatesLockContention_;
+    }
+
     @Override
     public boolean equals(Object other) {
       if (this.getClass() != other.getClass()) return false;
       Entry entry = (Entry) other;
       return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates()
           && lastSentVersion_ == entry.getLastSentVersion()
-          && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate();
+          && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate()
+          && numSkippedUpdatesLockContention_ == entry
+          .getNumSkippedUpdatesLockContention();
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index fcf3cb5..3841dfd 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -98,7 +98,6 @@ import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HiveStorageDescriptorFactory;
-import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
@@ -594,11 +593,11 @@ public class CatalogOpExecutor {
         // The table lock is needed here since toTCatalogObject() calls 
Table#toThrift()
         // which expects the current thread to hold this lock. For more 
details refer
         // to IMPALA-4092.
-        t.getLock().lock();
+        t.takeReadLock();
         try {
           response.result.addToUpdated_catalog_objects(t.toTCatalogObject());
         } finally {
-          t.getLock().unlock();
+          t.releaseReadLock();
         }
       }
     }
@@ -666,7 +665,7 @@ public class CatalogOpExecutor {
           String.format("Can't rename to blacklisted table name: %s. %s", 
newTableName,
               BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
     }
-    tryLock(tbl);
+    tryWriteLock(tbl);
     // Get a new catalog version to assign to the table being altered.
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(), 
newCatalogVersion);
@@ -884,7 +883,7 @@ public class CatalogOpExecutor {
       UnlockWriteLockIfErronouslyLocked();
       // Clear in-progress modifications in case of exceptions.
       tbl.resetInProgressModification();
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -906,7 +905,7 @@ public class CatalogOpExecutor {
   private void alterKuduTable(TAlterTableParams params, TDdlExecResponse 
response,
       KuduTable tbl, long newCatalogVersion, boolean wantMinimalResult)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     switch (params.getAlter_type()) {
       case ADD_COLUMNS:
         TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
@@ -965,7 +964,7 @@ public class CatalogOpExecutor {
   private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse 
response,
       IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     switch (params.getAlter_type()) {
       case ADD_COLUMNS:
         TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
@@ -1008,7 +1007,7 @@ public class CatalogOpExecutor {
   private void loadTableMetadata(Table tbl, long newCatalogVersion,
       boolean reloadFileMetadata, boolean reloadTableSchema,
       Set<String> partitionsToUpdate, String reason) throws CatalogException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
@@ -1071,7 +1070,7 @@ public class CatalogOpExecutor {
         "Load for ALTER VIEW");
     Preconditions.checkState(tbl instanceof View, "Expected view: %s",
         tableName);
-    tryLock(tbl);
+    tryWriteLock(tbl);
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -1102,7 +1101,7 @@ public class CatalogOpExecutor {
       addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -1136,7 +1135,7 @@ public class CatalogOpExecutor {
       Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
       @Nullable String debugAction)
       throws ImpalaException {
-    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
     Preconditions.checkState(params.isSetTable_stats() || 
params.isSetColumn_stats());
 
     TableName tableName = table.getTableName();
@@ -1610,7 +1609,7 @@ public class CatalogOpExecutor {
     Preconditions.checkState(!AcidUtils.isTransactionalTable(
         table.getMetaStoreTable().getParameters()));
 
-    tryLock(table, "dropping stats");
+    tryWriteLock(table, "dropping stats");
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -1643,7 +1642,7 @@ public class CatalogOpExecutor {
       addSummary(resp, "Stats have been dropped.");
     } finally {
       UnlockWriteLockIfErronouslyLocked();
-      table.getLock().unlock();
+      table.releaseWriteLock();
     }
   }
 
@@ -1652,7 +1651,7 @@ public class CatalogOpExecutor {
    * that were updated as part of this operation.
    */
   private int dropColumnStats(Table table) throws ImpalaRuntimeException {
-    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
     int numColsUpdated = 0;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       for (Column col: table.getColumns()) {
@@ -1684,7 +1683,7 @@ public class CatalogOpExecutor {
    * is unpartitioned.
    */
   private int dropTableStats(Table table) throws ImpalaException {
-    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
     // Delete the ROW_COUNT from the table (if it was set).
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
table.getMetaStoreTable();
     int numTargetedPartitions = 0;
@@ -2130,7 +2129,7 @@ public class CatalogOpExecutor {
     // If transactional, the lock will be released for some time to acquire 
the HMS Acid
     // lock. It's safe because transactional -> non-transactional conversion 
is not
     // allowed.
-    tryLock(table, "truncating");
+    tryWriteLock(table, "truncating");
     try {
       long newCatalogVersion = 0;
       try {
@@ -2150,8 +2149,8 @@ public class CatalogOpExecutor {
       addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
-      if (table.getLock().isHeldByCurrentThread()) {
-        table.getLock().unlock();
+      if (table.isWriteLockedByCurrentThread()) {
+        table.releaseWriteLock();
       }
     }
   }
@@ -2166,7 +2165,7 @@ public class CatalogOpExecutor {
    */
   private long truncateTransactionalTable(TTruncateParams params, Table table)
       throws ImpalaException {
-    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
     
Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
     catalog_.getLock().writeLock().unlock();
     TableName tblName = TableName.fromThrift(params.getTable_name());
@@ -2180,11 +2179,11 @@ public class CatalogOpExecutor {
         Preconditions.checkState(txn.getId() > 0);
         // We need to release catalog table lock here, because HMS Acid table 
lock
         // must be locked in advance to avoid dead lock.
-        table.getLock().unlock();
+        table.releaseWriteLock();
         //TODO: if possible, set DataOperationType to something better than 
NO_TXN.
         catalog_.lockTableInTransaction(tblName.getDb(), tblName.getTbl(), txn,
             DataOperationType.NO_TXN, ctx);
-        tryLock(table, "truncating");
+        tryWriteLock(table, "truncating");
         newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
         catalog_.getLock().writeLock().unlock();
         TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
@@ -2244,7 +2243,7 @@ public class CatalogOpExecutor {
    */
   private boolean isTableBeingReplicated(IMetaStoreClient metastoreClient,
       HdfsTable tbl) throws CatalogException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     String dbName = tbl.getDb().getName();
     try {
       Database db = metastoreClient.getDatabase(dbName);
@@ -2260,7 +2259,7 @@ public class CatalogOpExecutor {
 
   private long truncateNonTransactionalTable(TTruncateParams params, Table 
table)
       throws Exception {
-    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
     
Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     catalog_.getLock().writeLock().unlock();
@@ -2399,7 +2398,7 @@ public class CatalogOpExecutor {
       addSummary(response, "Table already exists.");
       LOG.trace("Skipping table creation because {} already exists and " +
           "IF NOT EXISTS was specified.", tableName);
-      tryLock(existingTbl);
+      tryWriteLock(existingTbl);
       try {
         if (syncDdl) {
           // When SYNC_DDL is enabled and the table already exists, we force a 
version
@@ -2421,7 +2420,7 @@ public class CatalogOpExecutor {
       } finally {
         // Release the locks held in tryLock().
         catalog_.getLock().writeLock().unlock();
-        existingTbl.getLock().unlock();
+        existingTbl.releaseWriteLock();
       }
       return false;
     }
@@ -2838,7 +2837,7 @@ public class CatalogOpExecutor {
       addSummary(response, "Table already exists.");
       LOG.trace(String.format("Skipping table creation because %s already 
exists and " +
           "IF NOT EXISTS was specified.", tblName));
-      tryLock(existingTbl);
+      tryWriteLock(existingTbl);
       try {
         if (syncDdl) {
           // When SYNC_DDL is enabled and the table already exists, we force a 
version
@@ -2860,7 +2859,7 @@ public class CatalogOpExecutor {
       } finally {
         // Release the locks held in tryLock().
         catalog_.getLock().writeLock().unlock();
-        existingTbl.getLock().unlock();
+        existingTbl.releaseWriteLock();
       }
       return;
     }
@@ -3000,7 +2999,7 @@ public class CatalogOpExecutor {
    */
   private boolean alterTableAddCols(Table tbl, List<TColumn> columns, boolean 
ifNotExists)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     List<TColumn> colsToAdd = new ArrayList<>();
     for (TColumn column: columns) {
@@ -3028,7 +3027,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableReplaceCols(Table tbl, List<TColumn> columns)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     List<FieldSchema> newColumns = buildFieldSchemaList(columns);
     msTbl.getSd().setCols(newColumns);
@@ -3048,7 +3047,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableAlterCol(Table tbl, String colName,
       TColumn newCol) throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     // Find the matching column name and change it.
     Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
@@ -3096,7 +3095,7 @@ public class CatalogOpExecutor {
    */
   private Table alterTableAddPartitions(Table tbl,
       TAlterTableAddPartitionParams addPartParams) throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
 
     TableName tableName = tbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
@@ -3269,7 +3268,7 @@ public class CatalogOpExecutor {
       List<List<TPartitionKeyValue>> partitionSet,
       boolean ifExists, boolean purge, Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     Preconditions.checkNotNull(partitionSet);
 
     TableName tableName = tbl.getTableName();
@@ -3323,7 +3322,7 @@ public class CatalogOpExecutor {
    * Removes a column from the given table.
    */
   private void alterTableDropCol(Table tbl, String colName) throws 
ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     // Find the matching column name and remove it.
     Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
@@ -3355,7 +3354,7 @@ public class CatalogOpExecutor {
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
       long newCatalogVersion, boolean wantMinimalResult, TDdlExecResponse 
response)
       throws ImpalaException {
-    Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
+    Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread()
         && catalog_.getLock().isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -3466,7 +3465,7 @@ public class CatalogOpExecutor {
       List<List<TPartitionKeyValue>> partitionSet, THdfsFileFormat fileFormat,
       Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     boolean reloadFileMetadata = false;
     if (partitionSet == null) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -3501,7 +3500,7 @@ public class CatalogOpExecutor {
       List<List<TPartitionKeyValue>> partitionSet, TTableRowFormat tRowFormat,
       Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     Preconditions.checkArgument(tbl instanceof HdfsTable);
     boolean reloadFileMetadata = false;
     RowFormat rowFormat = RowFormat.fromThrift(tRowFormat);
@@ -3548,7 +3547,7 @@ public class CatalogOpExecutor {
    */
   private boolean alterTableSetLocation(Table tbl,
       List<TPartitionKeyValue> partitionSpec, String location) throws 
ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     boolean reloadFileMetadata = false;
     if (partitionSpec == null) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -3578,7 +3577,7 @@ public class CatalogOpExecutor {
   private void alterTableSetTblProperties(Table tbl,
       TAlterTableSetTblPropertiesParams params, Reference<Long> 
numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     Map<String, String> properties = params.getProperties();
     Preconditions.checkNotNull(properties);
     if (params.isSetPartition_set()) {
@@ -3657,7 +3656,7 @@ public class CatalogOpExecutor {
    */
   private boolean alterTableSetCached(Table tbl, TAlterTableSetCachedParams 
params)
       throws ImpalaException {
-    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread());
     THdfsCachingOp cacheOp = params.getCache_op();
     Preconditions.checkNotNull(cacheOp);
     // Alter table params.
@@ -3785,7 +3784,7 @@ public class CatalogOpExecutor {
   private void alterPartitionSetCached(Table tbl,
       TAlterTableSetCachedParams params, Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread());
     THdfsCachingOp cacheOp = params.getCache_op();
     Preconditions.checkNotNull(cacheOp);
     Preconditions.checkNotNull(params.getPartition_set());
@@ -3847,7 +3846,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableRecoverPartitions(Table tbl, @Nullable String 
debugAction)
       throws ImpalaException {
-    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
+    Preconditions.checkArgument(tbl.isWriteLockedByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
HDFS table");
     }
@@ -4493,11 +4492,11 @@ public class CatalogOpExecutor {
             }
           } else {
             // Table was loaded from scratch, so it's already "refreshed".
-            tbl.getLock().lock();
+            tbl.takeReadLock();
             try {
               updatedThriftTable = tbl.toTCatalogObject(resultType);
             } finally {
-              tbl.getLock().unlock();
+              tbl.releaseReadLock();
             }
           }
         }
@@ -4572,7 +4571,7 @@ public class CatalogOpExecutor {
           update.getTarget_table());
     }
 
-    tryLock(table, "updating the catalog");
+    tryWriteLock(table, "updating the catalog");
     final Timer.Context context
         = 
table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
 
@@ -4809,7 +4808,7 @@ public class CatalogOpExecutor {
     } finally {
       context.stop();
       UnlockWriteLockIfErronouslyLocked();
-      table.getLock().unlock();
+      table.releaseWriteLock();
     }
 
     if (update.isSync_ddl()) {
@@ -5175,7 +5174,7 @@ public class CatalogOpExecutor {
       throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
         "Load for ALTER COMMENT");
-    tryLock(tbl);
+    tryWriteLock(tbl);
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -5195,7 +5194,7 @@ public class CatalogOpExecutor {
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : 
"table"));
     } finally {
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -5204,7 +5203,7 @@ public class CatalogOpExecutor {
       throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
         "Load for ALTER COLUMN COMMENT");
-    tryLock(tbl);
+    tryWriteLock(tbl);
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -5230,7 +5229,7 @@ public class CatalogOpExecutor {
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, "Column has been altered.");
     } finally {
-      tbl.getLock().unlock();
+      tbl.releaseWriteLock();
     }
   }
 
@@ -5251,20 +5250,20 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Try to lock a table in the catalog. Throw an InternalException if the 
catalog is
-   * unable to lock the given table.
+   * Tries to take the write lock of the table in the catalog. Throws an 
InternalException
+   * if the catalog is unable to lock the given table.
    */
-  private void tryLock(Table tbl) throws InternalException {
-    tryLock(tbl, "altering");
+  private void tryWriteLock(Table tbl) throws InternalException {
+    tryWriteLock(tbl, "altering");
   }
 
   /**
    * Try to lock a table in the catalog for a given operation. Throw an 
InternalException
    * if the catalog is unable to lock the given table.
    */
-  private void tryLock(Table tbl, String operation) throws InternalException {
+  private void tryWriteLock(Table tbl, String operation) throws 
InternalException {
     String type = tbl instanceof View ? "view" : "table";
-    if (!catalog_.tryLockTable(tbl)) {
+    if (!catalog_.tryWriteLock(tbl)) {
       throw new InternalException(String.format("Error %s (for) %s %s due to " 
+
           "lock contention.", operation, type, tbl.getFullName()));
     }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 4138c2f..9789bc6 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -264,11 +264,11 @@ public class CatalogObjectToFromThriftTest {
 
   private TTable getThriftTable(Table table) {
     TTable thriftTable = null;
-    table.getLock().lock();
+    table.takeReadLock();
     try {
       thriftTable = table.toThrift();
     } finally {
-      table.getLock().unlock();
+      table.releaseReadLock();
     }
     return thriftTable;
   }
diff --git a/tests/custom_cluster/test_topic_update_frequency.py 
b/tests/custom_cluster/test_topic_update_frequency.py
new file mode 100644
index 0000000..65f3f73
--- /dev/null
+++ b/tests/custom_cluster/test_topic_update_frequency.py
@@ -0,0 +1,213 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from multiprocessing.pool import ThreadPool
+
+import pytest
+import time
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfS3
+
+
[email protected]_listing_times
+class TestTopicUpdateFrequency(CustomClusterTestSuite):
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--topic_update_tbl_max_wait_time_ms=500")
+  def test_topic_updates_unblock(self):
+    """Test to simulate query blocking conditions as per IMPALA-6671
+    and makes sure that unrelated queries are not blocked by other long running
+    queries which block topic updates."""
+    # queries that we don't expect to block when a slow running blocking query 
is
+    # running in parallel. We want these queries to request the metadata from 
catalogd
+    # and hence the init queries invalidate the metadata before each test case 
run below.
+    non_blocking_queries = [
+      # each of these take about 2-4 seconds when there is no lock contention.
+      "describe functional.emptytable",
+      "select * from functional.tinytable limit 1",
+      "show partitions functional.alltypessmall",
+    ]
+    # queries used to reset the metadata of the non_blocking_queries so that 
they will
+    # reload the next time they are executed
+    init_queries = [
+      "invalidate metadata functional.emptytable",
+      "invalidate metadata functional.tinytable",
+      "invalidate metadata functional.alltypessmall",
+    ]
+    # make sure that the blocking query metadata is loaded in catalogd since 
table lock
+    # is only acquired on loaded tables.
+    self.client.execute("refresh tpcds.store_sales")
+    self.client.execute("refresh functional.alltypes")
+    # add the debug actions so that blocking queries take long time complete 
while
+    # holding the table lock. These debug actions are tuned such that each of 
the blocking
+    # queries below take little more than 10 seconds (2x slower than fast 
queries).
+    debug_action = 
"catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000"
+    blocking_query_options = {
+      "debug_action": debug_action,
+      "sync_ddl": "false"
+    }
+    blocking_queries = [
+      "refresh tpcds.store_sales",
+      "alter table tpcds.store_sales recover partitions",
+      "compute stats functional.alltypes"
+    ]
+
+    for blocking_query in blocking_queries:
+      print("Running blocking query: {0}".format(blocking_query))
+      # blocking query is without sync_ddl
+      blocking_query_options["sync_ddl"] = "false"
+      self.__run_topic_update_test(blocking_query,
+        non_blocking_queries, init_queries, 
blocking_query_options=blocking_query_options)
+      # blocking query is with sync_ddl
+      blocking_query_options["sync_ddl"] = "true"
+      self.__run_topic_update_test(blocking_query,
+        non_blocking_queries, init_queries, 
blocking_query_options=blocking_query_options)
+      non_blocking_query_options = {
+        "sync_ddl": "true",
+      }
+      self.__run_topic_update_test(blocking_query,
+        non_blocking_queries, init_queries, 
blocking_query_options=blocking_query_options,
+        non_blocking_query_options=non_blocking_query_options)
+      blocking_query_options["sync_ddl"] = "false"
+      self.__run_topic_update_test(blocking_query,
+        non_blocking_queries, init_queries, 
blocking_query_options=blocking_query_options,
+        non_blocking_query_options=non_blocking_query_options)
+
+  def __run_topic_update_test(self, slow_blocking_query, fast_queries,
+      init_queries, blocking_query_options,
+      non_blocking_query_options=None, blocking_query_min_time=10000,
+      fast_query_timeout_ms=6000, non_blocking_impalad=0,
+      expect_topic_updates_to_block=False):
+    """This function runs the slow query in a Impala client and then creates 
separate
+    Impala clients to run the fast_queries. It makes sure that the
+    fast_queries don't get blocked by the slow query by making sure that
+    fast_queries return back before the blocking query within a given expected
+    timeout."""
+    assert fast_query_timeout_ms < blocking_query_min_time
+    # run the init queries first in sync_ddl mode so that all the impalads are 
starting
+    # with a clean state.
+    for q in init_queries:
+      self.execute_query(q)
+
+    pool = ThreadPool(processes=len(fast_queries))
+    slow_query_pool = ThreadPool(processes=len(slow_blocking_query))
+    # run the slow query on the impalad-0 with the given query options
+    slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
+      args=(slow_blocking_query, blocking_query_options, 0))
+    # there is no other good way other than to wait for some time to make sure 
that
+    # the slow query has been submitted and being compiled before we start the
+    # non_blocking queries
+    time.sleep(1)
+    fast_query_futures = {}
+    for fast_query in fast_queries:
+      # run other queries on second impalad
+      fast_query_futures[fast_query] = pool.apply_async(self.exec_and_time,
+        args=(fast_query, non_blocking_query_options, non_blocking_impalad))
+
+    for fast_query in fast_query_futures:
+      if not expect_topic_updates_to_block:
+        assert fast_query_futures[
+                 fast_query].get() < fast_query_timeout_ms, \
+          "{0} did not complete within {1} msec".format(fast_query, 
fast_query_timeout_ms)
+      else:
+        # topic updates are expected to block and hence all the other queries 
should run
+        # only after blocking query finishes.
+        fast_query_futures[
+          fast_query].get() > blocking_query_min_time, \
+          "{0} did not complete within {1} msec".format(fast_query, 
fast_query_timeout_ms)
+    # make sure that the slow query exceeds the given timeout; otherwise the 
test
+    # doesn't make much sense.
+    assert slow_query_future.get() > blocking_query_min_time, \
+      "{0} query took less time than {1} msec".format(slow_blocking_query,
+        blocking_query_min_time)
+    # we wait for some time here to make sure that the topic updates from the 
last
+    # query have been propagated so that next run of this method starts from a 
clean
+    # state.
+    time.sleep(2)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--topic_update_tbl_max_wait_time_ms=500")
+  def test_topic_updates_advance(self):
+    """Test make sure that a if long running blocking queries are run 
continuously
+    topic-update thread is not starved and it eventually blocks until it 
acquires a table
+    lock."""
+    # Each of these queries take complete about 30s with the debug action 
delays
+    # below.
+    blocking_queries = [
+      "refresh tpcds.store_sales",
+      "alter table tpcds.store_sales recover partitions",
+      "compute stats functional.alltypes"
+    ]
+    debug_action = 
"catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000"
+    # loop in sync_ddl mode so that we know the topic updates are being 
propagated.
+    blocking_query_options = {
+      "debug_action": debug_action,
+      "sync_ddl": "true"
+    }
+    self.__run_loop_test(blocking_queries, blocking_query_options, 60000)
+
+  def __run_loop_test(self, blocking_queries, blocking_query_options, timeout):
+    """Runs the given list of queries with given query options in a loop
+    and makes sure that they complete without any errors."""
+    slow_query_pool = ThreadPool(processes=len(blocking_queries))
+    # run the slow query on the impalad-0 with the given query options
+    slow_query_futures = {}
+    for q in blocking_queries:
+      print("Running blocking query {0}".format(q))
+      slow_query_futures[q] = slow_query_pool.apply_async(self.loop_exec,
+        args=(q, blocking_query_options))
+
+    for q in slow_query_futures:
+      # make sure that queries complete eventually.
+      durations = slow_query_futures[q].get()
+      for i in range(len(durations)):
+        assert durations[i] < timeout, "Query {0} iteration {1} did " \
+                                       "not complete within {2}.".format(q, i, 
timeout)
+
+  def loop_exec(self, query, query_options, iterations=3, impalad=0):
+    durations = []
+    for iter in range(iterations):
+      durations.append(self.exec_and_time(query, query_options, impalad))
+    return durations
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--topic_update_tbl_max_wait_time_ms=0")
+  def test_topic_lock_timeout_disabled(self):
+    """Test makes sure that the topic update thread blocks until tables are
+    added to each topic update when topic_update_tbl_max_wait_time_ms is set 
to 0"""
+    # queries that we don't expect to block when a slow running blocking query 
is
+    # running in parallel. We want these queries to request the metadata from 
catalogd
+    # and hence the init queries invalidate the metadata before each test case 
run below.
+    non_blocking_queries = [
+      # each of these take about 2-4 seconds when there is no lock contention.
+      "describe functional.emptytable"
+    ]
+    # queries used to reset the metadata of the non_blocking_queries so that 
they will
+    # reload the next time they are executed
+    init_queries = [
+      "invalidate metadata functional.emptytable",
+    ]
+    # make sure that the blocking query metadata is loaded in catalogd since 
table lock
+    # is only acquired on loaded tables.
+    blocking_query = "refresh tpcds.store_sales"
+    debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+    self.client.execute(blocking_query)
+    blocking_query_options = {
+      "debug_action": debug_action,
+      "sync_ddl": "false"
+    }
+    self.__run_topic_update_test(blocking_query,
+      non_blocking_queries, init_queries, 
blocking_query_options=blocking_query_options,
+      expect_topic_updates_to_block=True)

Reply via email to