Repository: apex-core Updated Branches: refs/heads/master 891ed3ae9 -> 4a1570df9
APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/308f1a7b Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/308f1a7b Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/308f1a7b Branch: refs/heads/master Commit: 308f1a7b0e91902ae5e6edbb614e7ea5ce975417 Parents: 5fb9d04 Author: Vlad Rozov <[email protected]> Authored: Fri Nov 18 19:44:40 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sun Nov 20 17:42:13 2016 -0800 ---------------------------------------------------------------------- .../bufferserver/internal/LogicalNode.java | 6 +- .../bufferserver/internal/PhysicalNode.java | 25 ++-- .../datatorrent/bufferserver/server/Server.java | 117 +++++++------------ 3 files changed, 54 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index c08cfb9..ceb9469 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.policy.Policy; import com.datatorrent.bufferserver.util.BitVector; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.EventLoop; +import com.datatorrent.netlet.WriteOnlyClient; /** * LogicalNode represents a logical node in a DAG<p> @@ -99,7 +99,7 @@ public class LogicalNode implements DataListener * * @param connection */ - public void addConnection(AbstractLengthPrependerClient connection) + public void addConnection(WriteOnlyClient connection) { PhysicalNode pn = new PhysicalNode(connection); if (!physicalNodes.contains(pn)) { @@ -111,7 +111,7 @@ public class LogicalNode implements DataListener * * @param client */ - public void removeChannel(AbstractLengthPrependerClient client) + public void removeChannel(WriteOnlyClient client) { for (PhysicalNode pn : physicalNodes) { if (pn.getClient() == client) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java index 424a51a..9a3fe37 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.WriteOnlyClient; /** * PhysicalNode represents one physical subscriber. @@ -32,16 +32,16 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient; */ public class PhysicalNode { - public static final int BUFFER_SIZE = 8 * 1024; private final long starttime; - private final AbstractLengthPrependerClient client; - private final long processedMessageCount; + private final WriteOnlyClient client; + private long processedMessageCount; + private SerializedData blocker; /** * * @param client */ - public PhysicalNode(AbstractLengthPrependerClient client) + public PhysicalNode(WriteOnlyClient client) { this.client = client; starttime = System.currentTimeMillis(); @@ -71,20 +71,11 @@ public class PhysicalNode * @param d * @throws InterruptedException */ - private SerializedData blocker; - public boolean send(SerializedData d) { - if (d.offset == d.dataOffset) { - if (client.write(d.buffer, d.offset, d.length)) { - return true; - } - } else { - if (client.send(d.buffer, d.offset, d.length)) { - return true; - } + if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - d.offset))) { + return true; } - blocker = d; return false; } @@ -150,7 +141,7 @@ public class PhysicalNode /** * @return the channel */ - public AbstractLengthPrependerClient getClient() + public WriteOnlyClient getClient() { return client; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 12eed5f..baa4e0b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -54,6 +54,7 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; import com.datatorrent.netlet.Listener.ServerListener; +import com.datatorrent.netlet.WriteOnlyLengthPrependerClient; import com.datatorrent.netlet.util.VarInt; /** @@ -171,7 +172,7 @@ public class Server implements ServerListener private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>(); private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, ClientListener> subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -235,15 +236,18 @@ public class Server implements ServerListener /** * * @param request - * @param connection + * @param key * @return */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, - final AbstractLengthPrependerClient connection) + public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, SelectionKey key) { String identifier = request.getIdentifier(); String type = request.getStreamType(); String upstream_identifier = request.getUpstreamIdentifier(); + final Subscriber subscriber = new Subscriber(type, request.getMask(), request.getPartitions(), request.getBufferSize()); + key.attach(subscriber); + subscriber.registered(key); + subscriber.connected(); // Check if there is a logical node of this type, if not create it. final LogicalNode ln; @@ -252,7 +256,7 @@ public class Server implements ServerListener /* * close previous connection with the same identifier which is guaranteed to be unique. */ - AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection); + ClientListener previous = subscriberChannels.put(identifier, subscriber); if (previous != null) { eventloop.disconnect(previous); } @@ -264,7 +268,7 @@ public class Server implements ServerListener public void run() { ln.boot(eventloop); - ln.addConnection(connection); + ln.addConnection(subscriber); ln.catchUp(); } }); @@ -302,7 +306,7 @@ public class Server implements ServerListener @Override public void run() { - ln.addConnection(connection); + ln.addConnection(subscriber); ln.catchUp(); dl.addDataListener(ln); } @@ -312,6 +316,32 @@ public class Server implements ServerListener return ln; } + private void teardownSubscriber(Subscriber subscriber) + { + LogicalNode ln = subscriberGroups.get(subscriber.type); + if (ln != null) { + if (subscriberChannels.containsValue(subscriber)) { + final Iterator<Entry<String, ClientListener>> i = subscriberChannels.entrySet().iterator(); + while (i.hasNext()) { + if (i.next().getValue() == subscriber) { + i.remove(); + break; + } + } + } + + ln.removeChannel(subscriber); + if (ln.getPhysicalNodeCount() == 0) { + DataList dl = publisherBuffers.get(ln.getUpstream()); + if (dl != null) { + dl.removeDataListener(ln); + } + subscriberGroups.remove(ln.getGroup()); + } + ln.getIterator().close(); + } + } + /** * * @param request @@ -464,43 +494,11 @@ public class Server implements ServerListener /* * unregister the unidentified client since its job is done! */ - unregistered(key); + unregistered(key.interestOps(0)); ignore = true; logger.info("Received subscriber request: {}", request); - SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request; - AbstractLengthPrependerClient subscriber; - -// /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */ - int bufferSize = subscriberRequest.getBufferSize(); -// if (bufferSize == 0) { -// bufferSize = 16 * 1024; -// } - if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize); - } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize) - { - @Override - public int readSize() - { - if (writeOffset - readOffset < 2) { - return -1; - } - - short s = buffer[readOffset++]; - return s | (buffer[readOffset++] << 8); - } - - }; - } - key.attach(subscriber); - key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriber.registered(key); - - handleSubscriberRequest(subscriberRequest, subscriber); + handleSubscriberRequest((SubscribeRequestTuple)request, key); break; case PURGE_REQUEST: @@ -528,7 +526,7 @@ public class Server implements ServerListener } - class Subscriber extends AbstractLengthPrependerClient + private class Subscriber extends WriteOnlyLengthPrependerClient { private final String type; private final int mask; @@ -536,18 +534,11 @@ public class Server implements ServerListener Subscriber(String type, int mask, int[] partitions, int bufferSize) { - super(1024, bufferSize); + super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize); this.type = type; this.mask = mask; this.partitions = partitions; - super.write = false; - } - - @Override - public void onMessage(byte[] buffer, int offset, int size) - { - logger.warn("Received data when no data is expected: {}", - Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); + super.isWriteEnabled = false; } @Override @@ -580,29 +571,7 @@ public class Server implements ServerListener return; } torndown = true; - - LogicalNode ln = subscriberGroups.get(type); - if (ln != null) { - if (subscriberChannels.containsValue(this)) { - final Iterator<Entry<String, AbstractLengthPrependerClient>> i = subscriberChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ln.removeChannel(this); - if (ln.getPhysicalNodeCount() == 0) { - DataList dl = publisherBuffers.get(ln.getUpstream()); - if (dl != null) { - dl.removeDataListener(ln); - } - subscriberGroups.remove(ln.getGroup()); - } - ln.getIterator().close(); - } + teardownSubscriber(this); } }
