Repository: apex-core
Updated Branches:
  refs/heads/master 41aea840d -> ad4210ba7


APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ad4210ba
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ad4210ba
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ad4210ba

Branch: refs/heads/master
Commit: ad4210ba7052feb5545e5f7d30095a404b7e61c3
Parents: 41aea84
Author: Vlad Rozov <[email protected]>
Authored: Mon Dec 19 18:17:25 2016 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Mon Mar 6 07:28:25 2017 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/LogicalNode.java      | 26 ++++-----
 .../bufferserver/internal/PhysicalNode.java     | 38 +++++--------
 .../datatorrent/bufferserver/server/Server.java | 56 +++++++++++++-------
 3 files changed, 64 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index c08cfb9..2921128 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.policy.Policy;
 import com.datatorrent.bufferserver.util.BitVector;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.WriteOnlyClient;
 
 /**
  * LogicalNode represents a logical node in a DAG<p>
@@ -54,6 +54,7 @@ public class LogicalNode implements DataListener
   private final Policy policy = GiveAll.getInstance();
   private final DataListIterator iterator;
   private final long skipWindowId;
+  private final EventLoop eventloop;
   private long baseSeconds;
   private boolean caughtup;
 
@@ -65,7 +66,7 @@ public class LogicalNode implements DataListener
    * @param iterator
    * @param skipWindowId
    */
-  public LogicalNode(String identifier, String upstream, String group, 
DataListIterator iterator, long skipWindowId)
+  public LogicalNode(String identifier, String upstream, String group, 
DataListIterator iterator, long skipWindowId, EventLoop eventloop)
   {
     this.identifier = identifier;
     this.upstream = upstream;
@@ -74,6 +75,7 @@ public class LogicalNode implements DataListener
     this.partitions = new HashSet<BitVector>();
     this.iterator = iterator;
     this.skipWindowId = skipWindowId;
+    this.eventloop = eventloop;
   }
 
   /**
@@ -99,7 +101,7 @@ public class LogicalNode implements DataListener
    *
    * @param connection
    */
-  public void addConnection(AbstractLengthPrependerClient connection)
+  public void addConnection(WriteOnlyClient connection)
   {
     PhysicalNode pn = new PhysicalNode(connection);
     if (!physicalNodes.contains(pn)) {
@@ -111,7 +113,7 @@ public class LogicalNode implements DataListener
    *
    * @param client
    */
-  public void removeChannel(AbstractLengthPrependerClient client)
+  public void removeChannel(WriteOnlyClient client)
   {
     for (PhysicalNode pn : physicalNodes) {
       if (pn.getClient() == client) {
@@ -138,9 +140,7 @@ public class LogicalNode implements DataListener
     if (!ready) {
       ready = true;
       for (PhysicalNode pn : physicalNodes) {
-        if (pn.isBlocked()) {
-          ready = pn.unblock() & ready;
-        }
+        ready = pn.unblock() & ready;
       }
     }
 
@@ -215,8 +215,9 @@ public class LogicalNode implements DataListener
                   physicalNodes);
           }
         }
-      } catch (InterruptedException ie) {
-        throw new RuntimeException(ie);
+      } catch (Exception e) {
+        logger.error("Disconnecting {}", this, e);
+        boot();
       }
 
       if (iterator.hasNext()) {
@@ -293,8 +294,9 @@ public class LogicalNode implements DataListener
               }
             }
           }
-        } catch (InterruptedException ie) {
-          throw new RuntimeException(ie);
+        } catch (Exception e) {
+          logger.error("Disconnecting {}", this, e);
+          boot();
         }
       } else {
         catchUp();
@@ -341,7 +343,7 @@ public class LogicalNode implements DataListener
     return identifier;
   }
 
-  public void boot(EventLoop eventloop)
+  public void boot()
   {
     for (PhysicalNode pn : physicalNodes) {
       eventloop.disconnect(pn.getClient());

http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
index 424a51a..456e7e7 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
@@ -23,7 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.WriteOnlyClient;
 
 /**
  * PhysicalNode represents one physical subscriber.
@@ -32,16 +32,16 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
  */
 public class PhysicalNode
 {
-  public static final int BUFFER_SIZE = 8 * 1024;
   private final long starttime;
-  private final AbstractLengthPrependerClient client;
-  private final long processedMessageCount;
+  private final WriteOnlyClient client;
+  private long processedMessageCount;
+  private SerializedData blocker;
 
   /**
    *
    * @param client
    */
-  public PhysicalNode(AbstractLengthPrependerClient client)
+  public PhysicalNode(WriteOnlyClient client)
   {
     this.client = client;
     starttime = System.currentTimeMillis();
@@ -71,21 +71,16 @@ public class PhysicalNode
    * @param d
    * @throws InterruptedException
    */
-  private SerializedData blocker;
-
   public boolean send(SerializedData d)
   {
-    if (d.offset == d.dataOffset) {
-      if (client.write(d.buffer, d.offset, d.length)) {
-        return true;
-      }
-    } else {
-      if (client.send(d.buffer, d.offset, d.length)) {
-        return true;
-      }
+    if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - 
d.offset))) {
+      return true;
+    }
+    if (blocker == null) {
+      blocker = d;
+    } else if (blocker != d) {
+      throw new IllegalStateException(String.format("Can't send data %s while 
blocker %s is pending on %s", d, blocker, this));
     }
-
-    blocker = d;
     return false;
   }
 
@@ -95,7 +90,7 @@ public class PhysicalNode
       return true;
     }
 
-    if (send(blocker)) {
+    if (client.send(blocker.buffer, blocker.dataOffset, blocker.length - 
(blocker.dataOffset - blocker.offset))) {
       blocker = null;
       return true;
     }
@@ -103,11 +98,6 @@ public class PhysicalNode
     return false;
   }
 
-  public boolean isBlocked()
-  {
-    return blocker != null;
-  }
-
   /**
    *
    * @return long
@@ -150,7 +140,7 @@ public class PhysicalNode
   /**
    * @return the channel
    */
-  public AbstractLengthPrependerClient getClient()
+  public WriteOnlyClient getClient()
   {
     return client;
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/ad4210ba/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index f819bb0..e0fe704 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -54,6 +53,7 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.netlet.Listener.ServerListener;
+import com.datatorrent.netlet.WriteOnlyLengthPrependerClient;
 import com.datatorrent.netlet.util.VarInt;
 
 /**
@@ -116,7 +116,7 @@ public class Server implements ServerListener
   public void unregistered(SelectionKey key)
   {
     for (LogicalNode ln : subscriberGroups.values()) {
-      ln.boot(eventloop);
+      ln.boot();
     }
     /*
      * There may be unregister tasks scheduled to run on the event loop that 
use serverHelperExecutor.
@@ -281,7 +281,7 @@ public class Server implements ServerListener
           final String type = request.getStreamType();
           final long skipWindowId = (long)request.getBaseSeconds() << 32 | 
request.getWindowId();
           final LogicalNode ln = new LogicalNode(identifier, 
upstream_identifier, type, dl
-              .newIterator(skipWindowId), skipWindowId);
+              .newIterator(skipWindowId), skipWindowId, eventloop);
 
           int mask = request.getMask();
           if (mask != 0) {
@@ -291,16 +291,19 @@ public class Server implements ServerListener
           }
           final LogicalNode oln = subscriberGroups.put(type, ln);
           if (oln != null) {
-            oln.boot(eventloop);
+            oln.boot();
           }
-          AbstractLengthPrependerClient subscriber = new Subscriber(ln, 
request.getBufferSize());
-
-          subscriber.registered(key);
-          key.attach(subscriber);
-          key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-
-          ln.catchUp();
-          dl.addDataListener(ln);
+          final Subscriber subscriber = new Subscriber(ln, 
request.getBufferSize());
+          eventloop.submit(new Runnable()
+          {
+            @Override
+            public void run()
+            {
+              key.attach(subscriber);
+              subscriber.registered(key);
+              subscriber.connected();
+            }
+          });
         }
       });
     } catch (RejectedExecutionException e) {
@@ -515,7 +518,7 @@ public class Server implements ServerListener
           /*
            * unregister the unidentified client since its job is done!
            */
-          unregistered(key);
+          unregistered(key.interestOps(0));
           ignore = true;
           logger.info("Received subscriber request: {}", request);
 
@@ -547,23 +550,36 @@ public class Server implements ServerListener
 
   }
 
-  class Subscriber extends AbstractLengthPrependerClient
+  private class Subscriber extends WriteOnlyLengthPrependerClient
   {
     private LogicalNode ln;
 
     Subscriber(LogicalNode ln, int bufferSize)
     {
-      super(1024, bufferSize);
+      super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize);
       this.ln = ln;
       ln.addConnection(this);
-      super.write = false;
     }
 
     @Override
-    public void onMessage(byte[] buffer, int offset, int size)
+    public void connected()
     {
-      logger.warn("Received data when no data is expected: {}",
-          Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
+      super.connected();
+      serverHelperExecutor.submit(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          final DataList dl = publisherBuffers.get(ln.getUpstream());
+          if (dl != null) {
+            ln.catchUp();
+            dl.addDataListener(ln);
+          } else {
+            logger.error("Disconnecting {} with no matching data list.", this);
+            ln.boot();
+          }
+        }
+      });
     }
 
     @Override
@@ -802,7 +818,7 @@ public class Server implements ServerListener
       }
 
       for (LogicalNode ln : list) {
-        ln.boot(eventloop);
+        ln.boot();
       }
     }
 

Reply via email to