Repository: apex-core Updated Branches: refs/heads/master cc79b0c02 -> a9e4e053b
APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d28c0ddd Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d28c0ddd Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d28c0ddd Branch: refs/heads/master Commit: d28c0ddd9d259855c1d9ba623bfb970342bc40c4 Parents: c97dd7c Author: Vlad Rozov <[email protected]> Authored: Fri Dec 2 19:08:49 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Fri Dec 2 19:08:49 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/bufferserver/server/Server.java | 259 ++++++++----------- 1 file changed, 109 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d28c0ddd/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 12eed5f..e720248 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -113,14 +113,27 @@ public class Server implements ServerListener @Override public void unregistered(SelectionKey key) { - serverHelperExecutor.shutdown(); - storageHelperExecutor.shutdown(); - try { - serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - logger.debug("Executor Termination", ex); + for (LogicalNode ln : subscriberGroups.values()) { + ln.boot(eventloop); } - logger.info("Server stopped listening at {}", address); + /* + * There may be unregister tasks scheduled to run on the event loop that use serverHelperExecutor. + */ + eventloop.submit(new Runnable() + { + @Override + public void run() + { + serverHelperExecutor.shutdown(); + storageHelperExecutor.shutdown(); + try { + serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + logger.debug("Executor Termination", ex); + } + logger.info("Server stopped listening at {}", address); + } + }); } public synchronized InetSocketAddress run(EventLoop eventloop) @@ -165,13 +178,12 @@ public class Server implements ServerListener @Override public String toString() { - return identity; + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + address + "}"; } private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>(); private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -235,81 +247,70 @@ public class Server implements ServerListener /** * * @param request - * @param connection - * @return + * @param key */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, - final AbstractLengthPrependerClient connection) + public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) { - String identifier = request.getIdentifier(); - String type = request.getStreamType(); - String upstream_identifier = request.getUpstreamIdentifier(); - - // Check if there is a logical node of this type, if not create it. - final LogicalNode ln; - if (subscriberGroups.containsKey(type)) { - //logger.debug("adding to exiting group = {}", subscriberGroups.get(type)); - /* - * close previous connection with the same identifier which is guaranteed to be unique. - */ - AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection); - if (previous != null) { - eventloop.disconnect(previous); - } - - ln = subscriberGroups.get(type); + try { serverHelperExecutor.submit(new Runnable() { @Override public void run() { - ln.boot(eventloop); - ln.addConnection(connection); - ln.catchUp(); - } - }); - } else { - /* - * if there is already a datalist registered for the type in which this client is interested, - * then get a iterator on the data items of that data list. If the datalist is not registered, - * then create one and register it. Hopefully this one would be used by future upstream nodes. - */ - final DataList dl; - if (publisherBuffers.containsKey(upstream_identifier)) { - dl = publisherBuffers.get(upstream_identifier); - //logger.debug("old list = {}", dl); - } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? - new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : - new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); - publisherBuffers.put(upstream_identifier, dl); - //logger.debug("new list = {}", dl); - } + final String upstream_identifier = request.getUpstreamIdentifier(); + + /* + * if there is already a datalist registered for the type in which this client is interested, + * then get a iterator on the data items of that data list. If the datalist is not registered, + * then create one and register it. Hopefully this one would be used by future upstream nodes. + */ + DataList dl = publisherBuffers.get(upstream_identifier); + if (dl == null) { + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : + new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); + DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, dl); + if (odl != null) { + dl = odl; + } + } - long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); - ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId); + final String identifier = request.getIdentifier(); + final String type = request.getStreamType(); + final long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); + final LogicalNode ln = new LogicalNode(identifier, upstream_identifier, type, dl + .newIterator(skipWindowId), skipWindowId); - int mask = request.getMask(); - if (mask != 0) { - for (Integer bs : request.getPartitions()) { - ln.addPartition(bs, mask); - } - } + int mask = request.getMask(); + if (mask != 0) { + for (Integer bs : request.getPartitions()) { + ln.addPartition(bs, mask); + } + } + final LogicalNode oln = subscriberGroups.put(type, ln); + if (oln != null) { + oln.boot(eventloop); + } + AbstractLengthPrependerClient subscriber = new Subscriber(ln, request.getBufferSize()); + + subscriber.registered(key); + key.attach(subscriber); + key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriberGroups.put(type, ln); - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - ln.addConnection(connection); ln.catchUp(); dl.addDataListener(ln); } }); + } catch (RejectedExecutionException e) { + logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e); + if (key.isValid()) { + try { + key.channel().close(); + } catch (IOException ioe) { + logger.error("Failed to close channel {}", key.channel(), ioe); + } + } } - - return ln; } /** @@ -322,9 +323,9 @@ public class Server implements ServerListener { String identifier = request.getIdentifier(); - DataList dl; + DataList dl = publisherBuffers.get(identifier); - if (publisherBuffers.containsKey(identifier)) { + if (dl != null) { /* * close previous connection with the same identifier which is guaranteed to be unique. */ @@ -333,7 +334,6 @@ public class Server implements ServerListener eventloop.disconnect(previous); } - dl = publisherBuffers.get(identifier); try { dl.rewind(request.getBaseSeconds(), request.getWindowId()); } catch (IOException ie) { @@ -343,7 +343,10 @@ public class Server implements ServerListener dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks); - publisherBuffers.put(identifier, dl); + DataList odl = publisherBuffers.putIfAbsent(identifier, dl); + if (odl != null) { + dl = odl; + } } dl.setSecondaryStorage(storage, storageHelperExecutor); @@ -468,39 +471,7 @@ public class Server implements ServerListener ignore = true; logger.info("Received subscriber request: {}", request); - SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request; - AbstractLengthPrependerClient subscriber; - -// /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */ - int bufferSize = subscriberRequest.getBufferSize(); -// if (bufferSize == 0) { -// bufferSize = 16 * 1024; -// } - if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize); - } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize) - { - @Override - public int readSize() - { - if (writeOffset - readOffset < 2) { - return -1; - } - - short s = buffer[readOffset++]; - return s | (buffer[readOffset++] << 8); - } - - }; - } - key.attach(subscriber); - key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriber.registered(key); - - handleSubscriberRequest(subscriberRequest, subscriber); + handleSubscriberRequest((SubscribeRequestTuple)request, key); break; case PURGE_REQUEST: @@ -530,16 +501,13 @@ public class Server implements ServerListener class Subscriber extends AbstractLengthPrependerClient { - private final String type; - private final int mask; - private final int[] partitions; + private LogicalNode ln; - Subscriber(String type, int mask, int[] partitions, int bufferSize) + Subscriber(LogicalNode ln, int bufferSize) { super(1024, bufferSize); - this.type = type; - this.mask = mask; - this.partitions = partitions; + this.ln = ln; + ln.addConnection(this); super.write = false; } @@ -553,58 +521,49 @@ public class Server implements ServerListener @Override public void unregistered(final SelectionKey key) { - super.unregistered(key); - teardown(); - } - - @Override - public void handleException(Exception cce, EventLoop el) - { - teardown(); - super.handleException(cce, el); - } + try { + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + teardown(); + } - @Override - public String toString() - { - return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + - ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; + @Override + public String toString() + { + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + + " teardown " + Subscriber.this; + } + }); + } catch (Exception e) { + logger.error("{}", this, e); + } + super.unregistered(key); } - private volatile boolean torndown; - private void teardown() { - //logger.debug("Teardown is being called {}", torndown, new Exception()); - if (torndown) { - return; - } - torndown = true; - - LogicalNode ln = subscriberGroups.get(type); if (ln != null) { - if (subscriberChannels.containsValue(this)) { - final Iterator<Entry<String, AbstractLengthPrependerClient>> i = subscriberChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ln.removeChannel(this); + ln.removeChannel(Subscriber.this); if (ln.getPhysicalNodeCount() == 0) { DataList dl = publisherBuffers.get(ln.getUpstream()); if (dl != null) { dl.removeDataListener(ln); } - subscriberGroups.remove(ln.getGroup()); + subscriberGroups.remove(ln.getGroup(), ln); + ln.getIterator().close(); + ln = null; } - ln.getIterator().close(); } } + @Override + public String toString() + { + return "Server.Subscriber{" + "ln=" + ln + "}"; + } } /**
