Repository: apex-core Updated Branches: refs/heads/master 41aea840d -> ad4210ba7
APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ad4210ba Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ad4210ba Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ad4210ba Branch: refs/heads/master Commit: ad4210ba7052feb5545e5f7d30095a404b7e61c3 Parents: 41aea84 Author: Vlad Rozov <[email protected]> Authored: Mon Dec 19 18:17:25 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Mon Mar 6 07:28:25 2017 -0800 ---------------------------------------------------------------------- .../bufferserver/internal/LogicalNode.java | 26 ++++----- .../bufferserver/internal/PhysicalNode.java | 38 +++++-------- .../datatorrent/bufferserver/server/Server.java | 56 +++++++++++++------- 3 files changed, 64 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index c08cfb9..2921128 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.policy.Policy; import com.datatorrent.bufferserver.util.BitVector; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.EventLoop; +import com.datatorrent.netlet.WriteOnlyClient; /** * LogicalNode represents a logical node in a DAG<p> @@ -54,6 +54,7 @@ public class LogicalNode implements DataListener private final Policy policy = GiveAll.getInstance(); private final DataListIterator iterator; private final long skipWindowId; + private final EventLoop eventloop; private long baseSeconds; private boolean caughtup; @@ -65,7 +66,7 @@ public class LogicalNode implements DataListener * @param iterator * @param skipWindowId */ - public LogicalNode(String identifier, String upstream, String group, DataListIterator iterator, long skipWindowId) + public LogicalNode(String identifier, String upstream, String group, DataListIterator iterator, long skipWindowId, EventLoop eventloop) { this.identifier = identifier; this.upstream = upstream; @@ -74,6 +75,7 @@ public class LogicalNode implements DataListener this.partitions = new HashSet<BitVector>(); this.iterator = iterator; this.skipWindowId = skipWindowId; + this.eventloop = eventloop; } /** @@ -99,7 +101,7 @@ public class LogicalNode implements DataListener * * @param connection */ - public void addConnection(AbstractLengthPrependerClient connection) + public void addConnection(WriteOnlyClient connection) { PhysicalNode pn = new PhysicalNode(connection); if (!physicalNodes.contains(pn)) { @@ -111,7 +113,7 @@ public class LogicalNode implements DataListener * * @param client */ - public void removeChannel(AbstractLengthPrependerClient client) + public void removeChannel(WriteOnlyClient client) { for (PhysicalNode pn : physicalNodes) { if (pn.getClient() == client) { @@ -138,9 +140,7 @@ public class LogicalNode implements DataListener if (!ready) { ready = true; for (PhysicalNode pn : physicalNodes) { - if (pn.isBlocked()) { - ready = pn.unblock() & ready; - } + ready = pn.unblock() & ready; } } @@ -215,8 +215,9 @@ public class LogicalNode implements DataListener physicalNodes); } } - } catch (InterruptedException ie) { - throw new RuntimeException(ie); + } catch (Exception e) { + logger.error("Disconnecting {}", this, e); + boot(); } if (iterator.hasNext()) { @@ -293,8 +294,9 @@ public class LogicalNode implements DataListener } } } - } catch (InterruptedException ie) { - throw new RuntimeException(ie); + } catch (Exception e) { + logger.error("Disconnecting {}", this, e); + boot(); } } else { catchUp(); @@ -341,7 +343,7 @@ public class LogicalNode implements DataListener return identifier; } - public void boot(EventLoop eventloop) + public void boot() { for (PhysicalNode pn : physicalNodes) { eventloop.disconnect(pn.getClient()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java index 424a51a..456e7e7 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.WriteOnlyClient; /** * PhysicalNode represents one physical subscriber. @@ -32,16 +32,16 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient; */ public class PhysicalNode { - public static final int BUFFER_SIZE = 8 * 1024; private final long starttime; - private final AbstractLengthPrependerClient client; - private final long processedMessageCount; + private final WriteOnlyClient client; + private long processedMessageCount; + private SerializedData blocker; /** * * @param client */ - public PhysicalNode(AbstractLengthPrependerClient client) + public PhysicalNode(WriteOnlyClient client) { this.client = client; starttime = System.currentTimeMillis(); @@ -71,21 +71,16 @@ public class PhysicalNode * @param d * @throws InterruptedException */ - private SerializedData blocker; - public boolean send(SerializedData d) { - if (d.offset == d.dataOffset) { - if (client.write(d.buffer, d.offset, d.length)) { - return true; - } - } else { - if (client.send(d.buffer, d.offset, d.length)) { - return true; - } + if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - d.offset))) { + return true; + } + if (blocker == null) { + blocker = d; + } else if (blocker != d) { + throw new IllegalStateException(String.format("Can't send data %s while blocker %s is pending on %s", d, blocker, this)); } - - blocker = d; return false; } @@ -95,7 +90,7 @@ public class PhysicalNode return true; } - if (send(blocker)) { + if (client.send(blocker.buffer, blocker.dataOffset, blocker.length - (blocker.dataOffset - blocker.offset))) { blocker = null; return true; } @@ -103,11 +98,6 @@ public class PhysicalNode return false; } - public boolean isBlocked() - { - return blocker != null; - } - /** * * @return long @@ -150,7 +140,7 @@ public class PhysicalNode /** * @return the channel */ - public AbstractLengthPrependerClient getClient() + public WriteOnlyClient getClient() { return client; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index f819bb0..e0fe704 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; @@ -54,6 +53,7 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; import com.datatorrent.netlet.Listener.ServerListener; +import com.datatorrent.netlet.WriteOnlyLengthPrependerClient; import com.datatorrent.netlet.util.VarInt; /** @@ -116,7 +116,7 @@ public class Server implements ServerListener public void unregistered(SelectionKey key) { for (LogicalNode ln : subscriberGroups.values()) { - ln.boot(eventloop); + ln.boot(); } /* * There may be unregister tasks scheduled to run on the event loop that use serverHelperExecutor. @@ -281,7 +281,7 @@ public class Server implements ServerListener final String type = request.getStreamType(); final long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); final LogicalNode ln = new LogicalNode(identifier, upstream_identifier, type, dl - .newIterator(skipWindowId), skipWindowId); + .newIterator(skipWindowId), skipWindowId, eventloop); int mask = request.getMask(); if (mask != 0) { @@ -291,16 +291,19 @@ public class Server implements ServerListener } final LogicalNode oln = subscriberGroups.put(type, ln); if (oln != null) { - oln.boot(eventloop); + oln.boot(); } - AbstractLengthPrependerClient subscriber = new Subscriber(ln, request.getBufferSize()); - - subscriber.registered(key); - key.attach(subscriber); - key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - - ln.catchUp(); - dl.addDataListener(ln); + final Subscriber subscriber = new Subscriber(ln, request.getBufferSize()); + eventloop.submit(new Runnable() + { + @Override + public void run() + { + key.attach(subscriber); + subscriber.registered(key); + subscriber.connected(); + } + }); } }); } catch (RejectedExecutionException e) { @@ -515,7 +518,7 @@ public class Server implements ServerListener /* * unregister the unidentified client since its job is done! */ - unregistered(key); + unregistered(key.interestOps(0)); ignore = true; logger.info("Received subscriber request: {}", request); @@ -547,23 +550,36 @@ public class Server implements ServerListener } - class Subscriber extends AbstractLengthPrependerClient + private class Subscriber extends WriteOnlyLengthPrependerClient { private LogicalNode ln; Subscriber(LogicalNode ln, int bufferSize) { - super(1024, bufferSize); + super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize); this.ln = ln; ln.addConnection(this); - super.write = false; } @Override - public void onMessage(byte[] buffer, int offset, int size) + public void connected() { - logger.warn("Received data when no data is expected: {}", - Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); + super.connected(); + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + final DataList dl = publisherBuffers.get(ln.getUpstream()); + if (dl != null) { + ln.catchUp(); + dl.addDataListener(ln); + } else { + logger.error("Disconnecting {} with no matching data list.", this); + ln.boot(); + } + } + }); } @Override @@ -802,7 +818,7 @@ public class Server implements ServerListener } for (LogicalNode ln : list) { - ln.boot(eventloop); + ln.boot(); } }
