APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d1646e42 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d1646e42 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d1646e42 Branch: refs/heads/master Commit: d1646e42bdf5594ef34070594733a7ca10123a3f Parents: d28c0dd Author: Vlad Rozov <[email protected]> Authored: Tue Dec 6 14:09:03 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Tue Dec 6 14:09:03 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/bufferserver/server/Server.java | 98 +++++++++++--------- 1 file changed, 55 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d1646e42/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index e720248..af55143 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -249,7 +249,7 @@ public class Server implements ServerListener * @param request * @param key */ - public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) + private void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) { try { serverHelperExecutor.submit(new Runnable() @@ -259,11 +259,11 @@ public class Server implements ServerListener { final String upstream_identifier = request.getUpstreamIdentifier(); - /* - * if there is already a datalist registered for the type in which this client is interested, - * then get a iterator on the data items of that data list. If the datalist is not registered, - * then create one and register it. Hopefully this one would be used by future upstream nodes. - */ + /* + * if there is already a datalist registered for the type in which this client is interested, + * then get a iterator on the data items of that data list. If the datalist is not registered, + * then create one and register it. Hopefully this one would be used by future upstream nodes. + */ DataList dl = publisherBuffers.get(upstream_identifier); if (dl == null) { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? @@ -302,7 +302,7 @@ public class Server implements ServerListener } }); } catch (RejectedExecutionException e) { - logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e); + logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", request, this, key.channel(), e); if (key.isValid()) { try { key.channel().close(); @@ -313,6 +313,52 @@ public class Server implements ServerListener } } + private void handleSubscriberTeardown(final SelectionKey key) + { + try { + final Subscriber subscriber = (Subscriber)key.attachment(); + if (subscriber != null) { + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + try { + final LogicalNode ln = subscriber.ln; + if (ln != null) { + ln.removeChannel(subscriber); + if (ln.getPhysicalNodeCount() == 0) { + DataList dl = publisherBuffers.get(ln.getUpstream()); + if (dl != null) { + logger.info("Removing ln {} from dl {}", ln, dl); + dl.removeDataListener(ln); + } + subscriberGroups.remove(ln.getGroup(), ln); + ln.getIterator().close(); + } + subscriber.ln = null; + } + } catch (Throwable t) { + logger.error("Buffer server {} failed to tear down subscriber {}.", Server.this, subscriber, t); + } + } + + @Override + public String toString() + { + return subscriber + " teardown task."; + } + }); + } else { + logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); + } + } catch (ClassCastException e) { + logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); + } catch (RejectedExecutionException e) { + logger.error("Subscriber {} teardown after server {} termination.", key.attachment(), this, e); + } + } + /** * * @param request @@ -521,48 +567,14 @@ public class Server implements ServerListener @Override public void unregistered(final SelectionKey key) { - try { - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - teardown(); - } - - @Override - public String toString() - { - return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + - " teardown " + Subscriber.this; - } - }); - } catch (Exception e) { - logger.error("{}", this, e); - } + handleSubscriberTeardown(key); super.unregistered(key); } - private void teardown() - { - if (ln != null) { - ln.removeChannel(Subscriber.this); - if (ln.getPhysicalNodeCount() == 0) { - DataList dl = publisherBuffers.get(ln.getUpstream()); - if (dl != null) { - dl.removeDataListener(ln); - } - subscriberGroups.remove(ln.getGroup(), ln); - ln.getIterator().close(); - ln = null; - } - } - } - @Override public String toString() { - return "Server.Subscriber{" + "ln=" + ln + "}"; + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + ln + "}"; } }
