Repository: incubator-impala
Updated Branches:
  refs/heads/master 2665866c1 -> 8377b9949


IMPALA-4765: Avoid using several loading threads on one table.

When there are multiple concurrent requests to the catalogd to
prioritize loading the same table, then several catalog loading
threads may end up waiting for that single table to be loaded,
effectively reducing the number of catalog loading threads. In
extreme examples, this might degrade to serial loading of tables.

This patch augments the existing data structures and code to
prevent using several loading threads for the same table.
Some of the existing data structures and code could be
consolidated/simplified but this patch does not try to address
that issue to minimize the risk of this change.

Testing: I could easily reproduce the bug locally with the steps
described in the JIRA. After this patch, I could not observe threads
being wasted anymore.

Change-Id: Idba5f1808e0b9cbbcf46245834d8ad38d01231cb
Reviewed-on: http://gerrit.cloudera.org:8080/5707
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fa4a054c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fa4a054c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fa4a054c

Branch: refs/heads/master
Commit: fa4a054cde012fc0cfc74b79cbdb7008491226bb
Parents: 2665866
Author: Alex Behm <[email protected]>
Authored: Thu Jan 12 17:51:51 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Sun Jan 15 08:38:13 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/TableLoadingMgr.java  | 55 ++++++++++++--------
 1 file changed, 34 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fa4a054c/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java 
b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
index 17f962d..35cb902 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
@@ -17,11 +17,8 @@
 
 package org.apache.impala.catalog;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -30,11 +27,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.log4j.Logger;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
@@ -102,14 +100,18 @@ public class TableLoadingMgr {
   private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ =
       new LinkedBlockingDeque<TTableName>();
 
-  // A thread safe HashSet of table names that are in the tableLoadingDeque_. 
Used to
-  // efficiently check for existence of items in the deque.
-  // Updates may lead/lag updates to the tableLoadingDeque_ - they are added 
to this set
-  // immediately before being added to the deque and removed immediately after 
removing
-  // from the deque. The fact the updates are not synchronized shouldn't impact
-  // functionality since this set is only used for efficient lookups.
-  private final Set<TTableName> tableLoadingSet_ =
-      Collections.synchronizedSet(new HashSet<TTableName>());
+  // Maps from table name to a boolean indicating whether that table is 
currently
+  // being loaded by a table loading thread. Used to prevent adding superfluous
+  // entries to the deque, and to ensure that only a single table loading 
thread
+  // is consumed per table.
+  // Entries are added to this map immediately before being added to the deque 
and
+  // removed after a load has completed.
+  // Once the load of a table begins, its associated boolean is set to true, 
and
+  // attempts to load the same table by a different thread become no-ops.
+  // This map is different from loadingTables_ because the latter tracks all 
in-flight
+  // loads - even those being processed by threads other than table loading 
threads.
+  private final ConcurrentHashMap<TTableName, AtomicBoolean> 
tableLoadingBarrier_ =
+      new ConcurrentHashMap<TTableName, AtomicBoolean>();
 
   // Map of table name to a FutureTask associated with the table load. Used to
   // prevent duplicate loads of the same table.
@@ -175,7 +177,10 @@ public class TableLoadingMgr {
    * Prioritizes the loading of the given table.
    */
   public void prioritizeLoad(TTableName tblName) {
-    tableLoadingSet_.add(tblName);
+    AtomicBoolean isLoading =
+        tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false));
+    // Only queue the table if a load is not already in progress.
+    if (isLoading != null && isLoading.get()) return;
     tableLoadingDeque_.offerFirst(tblName);
   }
 
@@ -183,9 +188,9 @@ public class TableLoadingMgr {
    * Submits a single table for background (low priority) loading.
    */
   public void backgroundLoad(TTableName tblName) {
-    // Only queue for background loading if the table doesn't already exist
-    // in the table loading set.
-    if (tableLoadingSet_.add(tblName)) {
+    // Only queue for background loading if the table isn't already queued or
+    // currently being loaded.
+    if (tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false)) == 
null) {
       tableLoadingDeque_.offerLast(tblName);
     }
   }
@@ -277,10 +282,16 @@ public class TableLoadingMgr {
   private void loadNextTable() throws InterruptedException {
     // Always get the next table from the head of the deque.
     final TTableName tblName = tableLoadingDeque_.takeFirst();
-    tableLoadingSet_.remove(tblName);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Loading next table. Remaining items in queue: "
-          + tableLoadingDeque_.size());
+    LOG.info("Loading next table from queue: " +
+        tblName.db_name + "." + tblName.table_name);
+    LOG.info(String.format("Remaining items in queue: %s. Loads in progress: 
%s",
+        tableLoadingDeque_.size(), loadingTables_.size()));
+
+    AtomicBoolean isLoading = tableLoadingBarrier_.get(tblName);
+    if (isLoading == null || !isLoading.compareAndSet(false, true)) {
+      // Another thread has already completed the load or the load is still in 
progress.
+      // Return so this thread can work on another table in the queue.
+      return;
     }
     try {
       // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". 
We would
@@ -288,6 +299,8 @@ public class TableLoadingMgr {
       catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
     } catch (CatalogException e) {
       // Ignore.
+    } finally {
+      tableLoadingBarrier_.remove(tblName);
     }
   }
 

Reply via email to