This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push:
new 42a3534 Fix multiple concurrency bugs in
Master.gatherTableInformation() (#546)
42a3534 is described below
commit 42a3534ebcd1cd27df863f10487708d4648ab03a
Author: Keith Turner <[email protected]>
AuthorDate: Tue Jul 10 18:04:59 2018 -0400
Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
Master.gatherTableInformation() had the following problems :
* If Property.MASTER_STATUS_THREAD_POOL_SIZE set > 1, then multiple threads
would put into a TreeMap
* Create a thread pool and never called shutdown now
* Returns a reference to a treemap that threads in thread pool may still
be adding to.
This patch also attempts to address the issues brought up in #402 by
switching
to a cached thread pool. This will allow the thread pool to expand so that
unresponsive tservers do not prevent gathering status from responsive
tservers.
---
.../org/apache/accumulo/core/conf/Property.java | 5 +--
.../java/org/apache/accumulo/master/Master.java | 36 ++++++++++++++++------
2 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1eed867..6a2024e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -340,8 +340,9 @@ public enum Property {
MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
"5s", PropertyType.TIMEDURATION,
"The time between adjustments of the coordinator thread pool"),
- MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1",
PropertyType.COUNT,
- "The number of threads to use when fetching the tablet server status for
balancing."),
+ MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "0",
PropertyType.COUNT,
+ "The number of threads to use when fetching the tablet server status for
balancing. Zero "
+ + "indicates an unlimited number of threads will be used."),
MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false",
PropertyType.BOOLEAN,
"Allow tablets for the " + MetadataTable.NAME
+ " table to be suspended via table.suspend.duration."),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9414b98..2f124e3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -163,6 +164,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
/**
@@ -1065,7 +1067,7 @@ public class Master extends AccumuloServerContext
private long updateStatus()
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
- tserverStatus =
Collections.synchronizedSortedMap(gatherTableInformation(currentServers));
+ tserverStatus = gatherTableInformation(currentServers);
checkForHeldServer(tserverStatus);
if (!badServers.isEmpty()) {
@@ -1146,12 +1148,20 @@ public class Master extends AccumuloServerContext
private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
Set<TServerInstance> currentServers) {
+ final long rpcTimeout =
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+ int threads =
getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE);
+ ExecutorService tp = threads == 0 ? Executors.newCachedThreadPool()
+ : Executors.newFixedThreadPool(threads);
long start = System.currentTimeMillis();
- int threads =
Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE),
1);
- ExecutorService tp = Executors.newFixedThreadPool(threads);
- final SortedMap<TServerInstance,TabletServerStatus> result = new
TreeMap<>();
+ final SortedMap<TServerInstance,TabletServerStatus> result = new
ConcurrentSkipListMap<>();
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
+ if (threads == 0) {
+ // Since an unbounded thread pool is being used, rate limit how fast
task are added to the
+ // executor. This prevents the threads from growing large unless there
are lots of
+ // unresponsive tservers.
+ sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000),
TimeUnit.MILLISECONDS);
+ }
tp.submit(new Runnable() {
@Override
public void run() {
@@ -1191,18 +1201,24 @@ public class Master extends AccumuloServerContext
}
tp.shutdown();
try {
-
tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT)
* 2,
- TimeUnit.MILLISECONDS);
+ tp.awaitTermination(Math.max(10000, rpcTimeout / 3),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.debug("Interrupted while fetching status");
}
+
+ tp.shutdownNow();
+
+ // Threads may still modify map after shutdownNow is called, so create an
immutable snapshot.
+ SortedMap<TServerInstance,TabletServerStatus> info =
ImmutableSortedMap.copyOf(result);
+
synchronized (badServers) {
badServers.keySet().retainAll(currentServers);
- badServers.keySet().removeAll(result.keySet());
+ badServers.keySet().removeAll(info.keySet());
}
- log.debug(String.format("Finished gathering information from %d servers in
%.2f seconds",
- result.size(), (System.currentTimeMillis() - start) / 1000.));
- return result;
+ log.debug(String.format("Finished gathering information from %d of %d
servers in %.2f seconds",
+ info.size(), currentServers.size(), (System.currentTimeMillis() -
start) / 1000.));
+
+ return info;
}
public void run() throws IOException, InterruptedException, KeeperException {