During request processing, not processing remaining data in a client by discarding it when processing is being passed from one client to the next in the chain
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6db66a6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6db66a6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6db66a6c Branch: refs/heads/devel-3.0 Commit: 6db66a6c2628b36d2caed2a50b8fa47887f72c7f Parents: 0c65d27 Author: Pramod Immaneni <[email protected]> Authored: Tue Jul 21 13:19:25 2015 -0700 Committer: Chetan Narsude <[email protected]> Committed: Tue Aug 4 09:13:08 2015 -0700 ---------------------------------------------------------------------- .../datatorrent/bufferserver/server/Server.java | 24 ++++++++------------ 1 file changed, 10 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6db66a6c/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 7c21c6d..683eb8d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -352,15 +352,9 @@ public class Server implements ServerListener class AuthClient extends com.datatorrent.bufferserver.client.AuthClient { - boolean ignore; - @Override public void onMessage(byte[] buffer, int offset, int size) { - if (ignore) { - return; - } - authenticateMessage(buffer, offset, size); unregistered(key); @@ -374,21 +368,18 @@ public class Server implements ServerListener client.transferBuffer(buffer, readOffset + size, len); } - ignore = true; + // Remaining data has been transferred to next client in the chain and is going to be processed there so we would + // not be processing it here, hence discarding it + discardReadBuffer(); } } class UnidentifiedClient extends SeedDataClient { - boolean ignore; @Override public void onMessage(byte[] buffer, int offset, int size) { - if (ignore) { - return; - } - Tuple request = Tuple.getTuple(buffer, offset, size); switch (request.getType()) { case PUBLISHER_REQUEST: @@ -432,7 +423,10 @@ public class Server implements ServerListener if (len > 0) { publisher.transferBuffer(this.buffer, readOffset + size, len); } - ignore = true; + + // Remaining data transferred to next client and being processed there, not processed here anymore hence + // discarding it + discardReadBuffer(); break; @@ -441,7 +435,9 @@ public class Server implements ServerListener * unregister the unidentified client since its job is done! */ unregistered(key); - ignore = true; + // Control is being transferred to next client in the chain so no more processing in this client after this + // message + discardReadBuffer(); logger.info("Received subscriber request: {}", request); SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
