ACCUMULO-1000 modified conditional map to not process tservers concurrently and reuse sessions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fdb95b40 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fdb95b40 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fdb95b40 Branch: refs/heads/ACCUMULO-1000 Commit: fdb95b40513094786b646dd682bb9d58ae06365d Parents: ec53713 Author: [email protected] <[email protected]> Authored: Sat Jul 20 12:07:23 2013 -0400 Committer: [email protected] <[email protected]> Committed: Sat Jul 20 12:07:23 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 128 ++++++++++++++++--- .../server/tabletserver/TabletServer.java | 2 +- 2 files changed, 108 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdb95b40/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 31403fb..0e86ec7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -90,8 +90,12 @@ class ConditionalWriterImpl implements ConditionalWriter { private TabletLocator locator; private String tableId; - - private Map<String,BlockingQueue<TabletServerMutations<QCMutation>>> serverQueues; + private static class ServerQueue { + BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>(); + boolean taskQueued = false; + } + + private Map<String,ServerQueue> serverQueues; private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>(); private ScheduledThreadPoolExecutor threadPool; @@ -168,16 +172,17 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - private BlockingQueue<TabletServerMutations<QCMutation>> getServerQueue(String location) { - BlockingQueue<TabletServerMutations<QCMutation>> queue; + private ServerQueue getServerQueue(String location) { + ServerQueue serverQueue; synchronized (serverQueues) { - queue = serverQueues.get(location); - if (queue == null) { - queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>(); - serverQueues.put(location, queue); + serverQueue = serverQueues.get(location); + if (serverQueue == null) { + + serverQueue = new ServerQueue(); + serverQueues.put(location, serverQueue); } } - return queue; + return serverQueue; } private void queueRetry(List<QCMutation> mutations) { @@ -222,14 +227,38 @@ class ConditionalWriterImpl implements ConditionalWriter { private void queue(String location, TabletServerMutations<QCMutation> mutations) { - BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location); + ServerQueue serverQueue = getServerQueue(location); - queue.add(mutations); - threadPool.execute(new LoggingRunnable(log, new SendTask(location))); + synchronized (serverQueue) { + serverQueue.queue.add(mutations); + //never execute more that one task per server + if(!serverQueue.taskQueued){ + threadPool.execute(new LoggingRunnable(log, new SendTask(location))); + serverQueue.taskQueued = true; + } + } + } + private void reschedule(SendTask task){ + ServerQueue serverQueue = getServerQueue(task.location); + // just finished processing work for this server, could reschedule if it has more work or immediately process the work + // this code reschedules the the server for processing later... there may be other queues with + // more data that need to be processed... also it will give the current server time to build + // up more data... the thinking is that rescheduling instead or processing immediately will result + // in bigger batches and less RPC overhead + + synchronized (serverQueue) { + if(serverQueue.queue.size() > 0) + threadPool.execute(new LoggingRunnable(log, task)); + else + serverQueue.taskQueued = false; + } + + } + private TabletServerMutations<QCMutation> dequeue(String location) { - BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location); + BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location).queue; ArrayList<TabletServerMutations<QCMutation>> mutations = new ArrayList<TabletLocator.TabletServerMutations<QCMutation>>(); queue.drainTo(mutations); @@ -268,7 +297,7 @@ class ConditionalWriterImpl implements ConditionalWriter { this.threadPool = new ScheduledThreadPoolExecutor(3); this.threadPool.setMaximumPoolSize(3); this.locator = TabletLocator.getLocator(instance, new Text(tableId)); - this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations<QCMutation>>>(); + this.serverQueues = new HashMap<String,ServerQueue>(); this.tableId = tableId; Runnable failureHandler = new Runnable() { @@ -319,7 +348,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private class SendTask implements Runnable { - private String location; + String location; public SendTask(String location) { this.location = location; @@ -331,6 +360,8 @@ class ConditionalWriterImpl implements ConditionalWriter { TabletServerMutations<QCMutation> mutations = dequeue(location); if (mutations != null) sendToServer(location, mutations); + + reschedule(this); } } @@ -345,6 +376,55 @@ class ConditionalWriterImpl implements ConditionalWriter { } } + private static class SessionID { + long sessionID; + boolean reserved; + } + + private HashMap<String, SessionID> cachedSessionIDs = new HashMap<String, SessionID>(); + + private Long reserveSessionID(String location, TabletClientService.Iface client, TInfo tinfo) throws ThriftSecurityException, TException { + //avoid cost of repeatedly making RPC to create sessions, reuse sessions + synchronized (cachedSessionIDs) { + SessionID sid = cachedSessionIDs.get(location); + if (sid != null) { + if (sid.reserved) + throw new IllegalStateException(); + + sid.reserved = true; + return sid.sessionID; + } + } + + Long sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId); + + synchronized (cachedSessionIDs) { + SessionID sid = new SessionID(); + sid.reserved = true; + sid.sessionID = sessionId; + if(cachedSessionIDs.put(location, sid) != null) + throw new IllegalStateException(); + } + + return sessionId; + } + + private void invalidateSessionID(String location) { + synchronized (cachedSessionIDs) { + cachedSessionIDs.remove(location); + } + + } + + private void unreserveSessionID(String location){ + synchronized (cachedSessionIDs) { + SessionID sid = cachedSessionIDs.get(location); + if(!sid.reserved) + throw new IllegalStateException(); + sid.reserved = false; + } + } + private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) { TabletClientService.Iface client = null; @@ -363,11 +443,17 @@ class ConditionalWriterImpl implements ConditionalWriter { CompressedIterators compressedIters = new CompressedIterators(); convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters); - //TODO create a session per tserver and keep reusing it - sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId); + List<TCMResult> tresults = null; + while (tresults == null) { + try { + sessionId = reserveSessionID(location, client, tinfo); + tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable()); + } catch (NoSuchScanIDException nssie) { + sessionId = null; + invalidateSessionID(location); + } + } - List<TCMResult> tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable()); - HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>(); ArrayList<QCMutation> ignored = new ArrayList<QCMutation>(); @@ -392,8 +478,6 @@ class ConditionalWriterImpl implements ConditionalWriter { queueRetry(ignored); - } catch (NoSuchScanIDException nssie){ - queueRetry(cmidToCm); } catch (ThriftSecurityException tse) { AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance, tableId), tse); @@ -409,6 +493,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } catch (Exception e) { queueException(location, cmidToCm, e); } finally { + unreserveSessionID(location); ThriftUtil.returnClient((TServiceClient) client); } } @@ -591,6 +676,7 @@ class ConditionalWriterImpl implements ConditionalWriter { @Override public void close() { + //TODO could possible close cached sessions using async method to clean up sessions on server side threadPool.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdb95b40/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index ee1d1b6..013639e 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -1962,7 +1962,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu return results; }finally{ - sessionManager.removeSession(sessID, true); + sessionManager.unreserveSession(sessID); } }
