Repository: incubator-impala Updated Branches: refs/heads/master 2665866c1 -> 8377b9949
IMPALA-4765: Avoid using several loading threads on one table. When there are multiple concurrent requests to the catalogd to prioritize loading the same table, then several catalog loading threads may end up waiting for that single table to be loaded, effectively reducing the number of catalog loading threads. In extreme examples, this might degrade to serial loading of tables. This patch augments the existing data structures and code to prevent using several loading threads for the same table. Some of the existing data structures and code could be consolidated/simplified but this patch does not try to address that issue to minimize the risk of this change. Testing: I could easily reproduce the bug locally with the steps described in the JIRA. After this patch, I could not observe threads being wasted anymore. Change-Id: Idba5f1808e0b9cbbcf46245834d8ad38d01231cb Reviewed-on: http://gerrit.cloudera.org:8080/5707 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fa4a054c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fa4a054c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fa4a054c Branch: refs/heads/master Commit: fa4a054cde012fc0cfc74b79cbdb7008491226bb Parents: 2665866 Author: Alex Behm <[email protected]> Authored: Thu Jan 12 17:51:51 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Sun Jan 15 08:38:13 2017 +0000 ---------------------------------------------------------------------- .../apache/impala/catalog/TableLoadingMgr.java | 55 ++++++++++++-------- 1 file changed, 34 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fa4a054c/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java index 17f962d..35cb902 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java @@ -17,11 +17,8 @@ package org.apache.impala.catalog; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -30,11 +27,12 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.log4j.Logger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.HdfsCachingUtil; +import org.apache.log4j.Logger; + import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -102,14 +100,18 @@ public class TableLoadingMgr { private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ = new LinkedBlockingDeque<TTableName>(); - // A thread safe HashSet of table names that are in the tableLoadingDeque_. Used to - // efficiently check for existence of items in the deque. - // Updates may lead/lag updates to the tableLoadingDeque_ - they are added to this set - // immediately before being added to the deque and removed immediately after removing - // from the deque. The fact the updates are not synchronized shouldn't impact - // functionality since this set is only used for efficient lookups. - private final Set<TTableName> tableLoadingSet_ = - Collections.synchronizedSet(new HashSet<TTableName>()); + // Maps from table name to a boolean indicating whether that table is currently + // being loaded by a table loading thread. Used to prevent adding superfluous + // entries to the deque, and to ensure that only a single table loading thread + // is consumed per table. + // Entries are added to this map immediately before being added to the deque and + // removed after a load has completed. + // Once the load of a table begins, its associated boolean is set to true, and + // attempts to load the same table by a different thread become no-ops. + // This map is different from loadingTables_ because the latter tracks all in-flight + // loads - even those being processed by threads other than table loading threads. + private final ConcurrentHashMap<TTableName, AtomicBoolean> tableLoadingBarrier_ = + new ConcurrentHashMap<TTableName, AtomicBoolean>(); // Map of table name to a FutureTask associated with the table load. Used to // prevent duplicate loads of the same table. @@ -175,7 +177,10 @@ public class TableLoadingMgr { * Prioritizes the loading of the given table. */ public void prioritizeLoad(TTableName tblName) { - tableLoadingSet_.add(tblName); + AtomicBoolean isLoading = + tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false)); + // Only queue the table if a load is not already in progress. + if (isLoading != null && isLoading.get()) return; tableLoadingDeque_.offerFirst(tblName); } @@ -183,9 +188,9 @@ public class TableLoadingMgr { * Submits a single table for background (low priority) loading. */ public void backgroundLoad(TTableName tblName) { - // Only queue for background loading if the table doesn't already exist - // in the table loading set. - if (tableLoadingSet_.add(tblName)) { + // Only queue for background loading if the table isn't already queued or + // currently being loaded. + if (tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false)) == null) { tableLoadingDeque_.offerLast(tblName); } } @@ -277,10 +282,16 @@ public class TableLoadingMgr { private void loadNextTable() throws InterruptedException { // Always get the next table from the head of the deque. final TTableName tblName = tableLoadingDeque_.takeFirst(); - tableLoadingSet_.remove(tblName); - if (LOG.isTraceEnabled()) { - LOG.trace("Loading next table. Remaining items in queue: " - + tableLoadingDeque_.size()); + LOG.info("Loading next table from queue: " + + tblName.db_name + "." + tblName.table_name); + LOG.info(String.format("Remaining items in queue: %s. Loads in progress: %s", + tableLoadingDeque_.size(), loadingTables_.size())); + + AtomicBoolean isLoading = tableLoadingBarrier_.get(tblName); + if (isLoading == null || !isLoading.compareAndSet(false, true)) { + // Another thread has already completed the load or the load is still in progress. + // Return so this thread can work on another table in the queue. + return; } try { // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would @@ -288,6 +299,8 @@ public class TableLoadingMgr { catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name()); } catch (CatalogException e) { // Ignore. + } finally { + tableLoadingBarrier_.remove(tblName); } }
