Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 c96ed4d6b -> 687581111
APEXCORE-375 - Container killed because of Out of Sequence tuple error. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5a8ce1a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5a8ce1a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5a8ce1a5 Branch: refs/heads/release-3.3 Commit: 5a8ce1a51d86ac66ae9dd9cecc5ff02f6ede6370 Parents: c96ed4d Author: Vlad Rozov <[email protected]> Authored: Sun Mar 6 10:38:34 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Mon Mar 7 16:02:18 2016 -0800 ---------------------------------------------------------------------- .../bufferserver/internal/LogicalNode.java | 25 ++++++------ .../datatorrent/bufferserver/server/Server.java | 42 ++++++++++++-------- 2 files changed, 38 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5a8ce1a5/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 9856829..761bbea 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -156,19 +156,20 @@ public class LogicalNode implements DataListener */ public void catchUp() { - long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; - logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), - Codec.getStringWindowId(lBaseSeconds)); - if (lBaseSeconds > baseSeconds) { - baseSeconds = lBaseSeconds; - } - logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds)); - int intervalMillis; - - int skippedPayloadTuples = 0; - + caughtup = false; if (isReady()) { logger.debug("catching up {}->{}", upstream, group); + + long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; + logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds)); + if (lBaseSeconds > baseSeconds) { + baseSeconds = lBaseSeconds; + } + logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds)); + int intervalMillis; + + int skippedPayloadTuples = 0; + try { /* * fast forward to catch up with the windowId without consuming @@ -337,8 +338,8 @@ public class LogicalNode implements DataListener { for (PhysicalNode pn : physicalNodes) { eventloop.disconnect(pn.getClient()); - physicalNodes.clear(); } + physicalNodes.clear(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5a8ce1a5/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 76a0140..353eb2b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -222,14 +222,15 @@ public class Server implements ServerListener * @param connection * @return */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, AbstractLengthPrependerClient connection) + public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, + final AbstractLengthPrependerClient connection) { String identifier = request.getIdentifier(); String type = request.getStreamType(); String upstream_identifier = request.getUpstreamIdentifier(); // Check if there is a logical node of this type, if not create it. - LogicalNode ln; + final LogicalNode ln; if (subscriberGroups.containsKey(type)) { //logger.debug("adding to exiting group = {}", subscriberGroups.get(type)); /* @@ -241,15 +242,23 @@ public class Server implements ServerListener } ln = subscriberGroups.get(type); - ln.boot(eventloop); - ln.addConnection(connection); + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + ln.boot(eventloop); + ln.addConnection(connection); + ln.catchUp(); + } + }); } else { /* * if there is already a datalist registered for the type in which this client is interested, * then get a iterator on the data items of that data list. If the datalist is not registered, * then create one and register it. Hopefully this one would be used by future upstream nodes. */ - DataList dl; + final DataList dl; if (publisherBuffers.containsKey(upstream_identifier)) { dl = publisherBuffers.get(upstream_identifier); //logger.debug("old list = {}", dl); @@ -275,8 +284,16 @@ public class Server implements ServerListener } subscriberGroups.put(type, ln); - ln.addConnection(connection); - dl.addDataListener(ln); + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + ln.addConnection(connection); + ln.catchUp(); + dl.addDataListener(ln); + } + }); } return ln; @@ -469,16 +486,7 @@ public class Server implements ServerListener key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); subscriber.registered(key); - final LogicalNode logicalNode = handleSubscriberRequest(subscriberRequest, subscriber); - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - logicalNode.catchUp(); - } - - }); + handleSubscriberRequest(subscriberRequest, subscriber); break; case PURGE_REQUEST:
