[ 
https://issues.apache.org/jira/browse/APEXCORE-259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210144#comment-16210144
 ] 

ASF GitHub Bot commented on APEXCORE-259:
-----------------------------------------

sandeshh closed pull request #568: APEXCORE-259 Directly write to DataList
URL: https://github.com/apache/apex-core/pull/568
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 69efc045a6..12077f0753 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -78,8 +78,9 @@
   private MutableInt nextOffset = new MutableInt();
   private final ListenersNotifier listenersNotifier = new ListenersNotifier();
   private final boolean backPressureEnabled;
+  private final boolean dataFromSocket;
 
-  public DataList(final String identifier, final int blockSize, final int 
numberOfCacheBlocks, final boolean backPressureEnabled)
+  public DataList(final String identifier, final int blockSize, final int 
numberOfCacheBlocks, final boolean backPressureEnabled, final boolean 
dataFromSocket)
   {
     if (numberOfCacheBlocks < 1) {
       throw new IllegalArgumentException("Invalid number of Data List Memory 
blocks " + numberOfCacheBlocks);
@@ -90,6 +91,9 @@ public DataList(final String identifier, final int blockSize, 
final int numberOf
     this.blockSize = blockSize;
     this.backPressureEnabled = backPressureEnabled;
     first = last = new Block(identifier, blockSize);
+    this.dataFromSocket = dataFromSocket;
+
+    logger.info("Flush from the {} will be used.", (dataFromSocket) ? "socket" 
: "queue");
   }
 
   public DataList(String identifier)
@@ -98,7 +102,7 @@ public DataList(String identifier)
      * We use 64MB (the default HDFS block getSize) as the getSize of the 
memory pool so we can flush the data 1 block
      * at a time to the filesystem. We will use default value of 8 block sizes 
to be cached in memory
      */
-    this(identifier, 64 * 1024 * 1024, 8, true);
+    this(identifier, 64 * 1024 * 1024, 8, true, true);
   }
 
   public int getBlockSize()
@@ -106,16 +110,15 @@ public int getBlockSize()
     return blockSize;
   }
 
-  public void rewind(final int baseSeconds, final int windowId) throws 
IOException
+  public void rewind(final long windowId) throws IOException
   {
-    final long longWindowId = (long)baseSeconds << 32 | windowId;
     logger.debug("Rewinding {} from window ID {} to window ID {}", this, 
Codec.getStringWindowId(last.ending_window),
-        Codec.getStringWindowId(longWindowId));
+        Codec.getStringWindowId(windowId));
 
     int numberOfInMemBlockRewound = 0;
     synchronized (this) {
       for (Block temp = first; temp != null; temp = temp.next) {
-        if (temp.starting_window >= longWindowId || temp.ending_window > 
longWindowId) {
+        if (temp.starting_window >= windowId || temp.ending_window > windowId) 
{
           if (temp != last) {
             last.refCount.decrementAndGet();
             last = temp;
@@ -136,7 +139,7 @@ public void rewind(final int baseSeconds, final int 
windowId) throws IOException
             last.next = null;
             last.acquire(true);
           }
-          this.baseSeconds = last.rewind(longWindowId);
+          this.baseSeconds = last.rewind(windowId);
           processingOffset = last.writingOffset;
           size = 0;
           break;
@@ -204,7 +207,7 @@ public void purge(final long windowId)
         temp.discard(false);
         synchronized (temp) {
           if (temp.refCount.get() != 0) {
-            logger.debug("Discarded block {} has positive reference count. 
Listeners: {}", temp, all_listeners);
+            logger.warn("Discarded block {} has positive reference count. 
Listeners: {}", temp, all_listeners);
             throw new IllegalStateException("Discarded block " + temp + " has 
positive reference count!");
           }
           if (temp.data != null) {
@@ -234,9 +237,19 @@ public String getIdentifier()
 
   public void flush(final int writeOffset)
   {
-    //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, 
writeOffset = {}", size, processingOffset,
-    //    nextOffset.integer, writeOffset);
-    flush:
+    if (dataFromSocket) {
+      flushFromSocket(writeOffset);
+    } else {
+      flushInMemoryQueue(writeOffset);
+    }
+
+    last.writingOffset = writeOffset;
+    notifyListeners();
+  }
+
+  private void flushFromSocket(final int writeOffset)
+  {
+  flush:
     do {
       while (size == 0) {
         size = VarInt.read(last.data, processingOffset, writeOffset, 
nextOffset);
@@ -255,43 +268,61 @@ public void flush(final int writeOffset)
       processingOffset = nextOffset.integer;
 
       if (processingOffset + size <= writeOffset) {
-        switch (last.data[processingOffset]) {
-          case MessageType.BEGIN_WINDOW_VALUE:
-            Tuple bwt = Tuple.getTuple(last.data, processingOffset, size);
-            if (last.starting_window == -1) {
-              last.starting_window = baseSeconds | bwt.getWindowId();
-              last.ending_window = last.starting_window;
-              //logger.debug("assigned both window id {}", last);
-            } else {
-              last.ending_window = baseSeconds | bwt.getWindowId();
-              //logger.debug("assigned last window id {}", last);
-            }
-            break;
-
-          case MessageType.RESET_WINDOW_VALUE:
-            Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
-            baseSeconds = (long)rwt.getBaseSeconds() << 32;
-            break;
-
-          default:
-            break;
-        }
-        processingOffset += size;
-        size = 0;
+        processWindowTuple();
       } else {
-        if (writeOffset == last.data.length) {
-          nextOffset.integer = 0;
-          processingOffset = 0;
-          size = 0;
-        }
+        resetAtEndOffset(writeOffset);
         break;
       }
-    } while (true);
+    }
+    while (true);
+  }
 
-    last.writingOffset = writeOffset;
+  private void flushInMemoryQueue(final int writeOffset)
+  {
+    size = VarInt.read(last.data, processingOffset, writeOffset, nextOffset);
+    processingOffset = nextOffset.integer;
 
-    notifyListeners();
+    if (processingOffset + size <= writeOffset) {
+      processWindowTuple();
+    } else {
+      resetAtEndOffset(writeOffset);
+    }
+  }
+
+  private void resetAtEndOffset(final int writeOffset)
+  {
+    if (writeOffset == last.data.length) {
+      nextOffset.integer = 0;
+      processingOffset = 0;
+      size = 0;
+    }
+  }
 
+  private void processWindowTuple()
+  {
+    switch (last.data[processingOffset]) {
+      case MessageType.BEGIN_WINDOW_VALUE:
+        Tuple bwt = Tuple.getTuple(last.data, processingOffset, size);
+        if (last.starting_window == -1) {
+          last.starting_window = baseSeconds | bwt.getWindowId();
+          last.ending_window = last.starting_window;
+          //logger.debug("assigned both window id {}", last);
+        } else {
+          last.ending_window = baseSeconds | bwt.getWindowId();
+          //logger.debug("assigned last window id {}", last);
+        }
+        break;
+
+      case MessageType.RESET_WINDOW_VALUE:
+        Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
+        baseSeconds = (long)rwt.getBaseSeconds() << 32;
+        break;
+
+      default:
+        break;
+    }
+    processingOffset += size;
+    size = 0;
   }
 
   public void notifyListeners()
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index 2e758936c9..da61e10405 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -37,9 +37,9 @@ public FastDataList(String identifier)
     super(identifier);
   }
 
-  public FastDataList(String identifier, int blocksize, int 
numberOfCacheBlocks, boolean backPressureEnabled)
+  public FastDataList(String identifier, int blocksize, int 
numberOfCacheBlocks, boolean backPressureEnabled, boolean dataFromSocket)
   {
-    super(identifier, blocksize, numberOfCacheBlocks, backPressureEnabled);
+    super(identifier, blocksize, numberOfCacheBlocks, backPressureEnabled, 
dataFromSocket);
   }
 
   long item;
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2690..603f772938 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -36,6 +37,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.jctools.queues.SpscArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -345,8 +347,8 @@ public void run()
           DataList dl = publisherBuffers.get(upstream_identifier);
           if (dl == null) {
             dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
-                new FastDataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED) :
-                new DataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED);
+                new FastDataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED, true) :
+                new DataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED, true);
             DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, 
dl);
             if (odl != null) {
               dl = odl;
@@ -448,28 +450,32 @@ public String toString()
    */
   public DataList handlePublisherRequest(PublishRequestTuple request, 
AbstractLengthPrependerClient connection)
   {
-    String identifier = request.getIdentifier();
+    return publisherRequestHelper(request.getIdentifier(), 
(long)request.getBaseSeconds() << 32 | request.getWindowId(), connection, 
request.getVersion());
+  }
 
+  private DataList publisherRequestHelper(String identifier, long windowId, 
AbstractLengthPrependerClient connection, String version)
+  {
     DataList dl = publisherBuffers.get(identifier);
 
     if (dl != null) {
       /*
        * close previous connection with the same identifier which is 
guaranteed to be unique.
        */
-      AbstractLengthPrependerClient previous = 
publisherChannels.put(identifier, connection);
-      if (previous != null) {
-        eventloop.disconnect(previous);
+
+      if (connection != null) {
+        AbstractLengthPrependerClient previous = 
publisherChannels.put(identifier, connection);
+        if (previous != null) {
+          eventloop.disconnect(previous);
+        }
       }
 
       try {
-        dl.rewind(request.getBaseSeconds(), request.getWindowId());
+        dl.rewind(windowId);
       } catch (IOException ie) {
         throw new RuntimeException(ie);
       }
     } else {
-      dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
-          new FastDataList(identifier, blockSize, numberOfCacheBlocks, 
BACK_PRESSURE_ENABLED) :
-          new DataList(identifier, blockSize, numberOfCacheBlocks, 
BACK_PRESSURE_ENABLED);
+      dl = Tuple.FAST_VERSION.equals(version) ? new FastDataList(identifier, 
blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED, true) : new 
DataList(identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED, 
connection != null ? true : false);
       DataList odl = publisherBuffers.putIfAbsent(identifier, dl);
       if (odl != null) {
         dl = odl;
@@ -480,6 +486,15 @@ public DataList handlePublisherRequest(PublishRequestTuple 
request, AbstractLeng
     return dl;
   }
 
+  public QueuePublisher handleQueuePublisher(long windowId, String identifier, 
int queueCapacity)
+  {
+    DataList dl = publisherRequestHelper(identifier, windowId, null, null);
+
+    dl.setAutoFlushExecutor(serverHelperExecutor);
+
+    return new QueuePublisher(dl, windowId, queueCapacity);
+  }
+
   @Override
   public ClientListener getClientConnection(SocketChannel sc, 
ServerSocketChannel ssc)
   {
@@ -896,7 +911,128 @@ private void teardown()
         ln.boot();
       }
     }
+  }
+
+  public static class QueuePublisher
+  {
+    private final DataList dl;
+    private byte[] buffer;
+    private volatile int offset = 0;
+    private Queue<byte[]> queue;
+    private Thread dataListWriter = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        boolean flushRequired = false;
+        int progressiveSleep = 0;
+
+        logger.info("Inside run");
+
+        try {
+          while (true) {
+            byte[] data = queue.poll();
+
+            if (data == null) {
+              if (flushRequired) {
+                dl.flush(offset);
+                flushRequired = false;
+                progressiveSleep = 0;
+              } else {
+                try {
+                  Thread.sleep(progressiveSleep++);
+                  progressiveSleep = Math.min(progressiveSleep, 10);
+                } catch (InterruptedException e) {
+                  e.printStackTrace();
+                }
+              }
+            } else {
+              put(data);
+              flushRequired = true;
+            }
+          }
+        } catch (Exception ex) {
+          logger.info("Writing to DataList is interrupted because of the 
exception {} ", ex);
+        }
+      }
+    });
+
+    QueuePublisher(DataList dl, long windowId, int queueCapacity)
+    {
+      this.dl = dl;
+      buffer = dl.getBuffer(windowId);
+      dataListWriter.setName("DataListWriter " + dl.getIdentifier());
+      queue = new SpscArrayQueue<>(256 * queueCapacity);
+      dataListWriter.start();
+    }
+
+    public byte[] getBuffer()
+    {
+      return buffer;
+    }
 
+    public int getOffset()
+    {
+      return offset;
+    }
+
+    public void setOffset(int offset)
+    {
+      this.offset = offset;
+    }
+
+    public void send(byte[] tuple)
+    {
+      while (!queue.offer(tuple)) {
+      }
+    }
+
+    private void put(byte[] tuple)
+    {
+      int len = VarInt.getSize(tuple.length);
+
+      if (buffer.length - offset >= len + tuple.length) {
+        write(tuple, offset);
+        return;
+      }
+
+      if (buffer.length - offset >= len) {
+        offset = VarInt.write(tuple.length, buffer, offset);
+      }
+
+      dl.flush(buffer.length);
+
+      acquireNewMemory();
+
+      write(tuple, 0);
+    }
+
+    private void write(byte[] tuple, int pos)
+    {
+      offset = VarInt.write(tuple.length, buffer, pos);
+      System.arraycopy(tuple, 0, buffer, offset, tuple.length);
+      offset += tuple.length;
+    }
+
+    private void acquireNewMemory()
+    {
+      int count = 0;
+      while (!dl.isMemoryBlockAvailable()) {
+        try {
+          Thread.sleep(10);
+
+          if (count++ == 100) {
+            count = 0;
+            logger.warn("Memory block not available, spooling needs to be 
done");
+          }
+        } catch (InterruptedException e) {
+          logger.warn("QueuePublisher received interrupt exception.");
+        }
+      }
+
+      buffer = dl.newBuffer(buffer.length);
+      dl.addBuffer(buffer);
+    }
   }
 
   abstract class SeedDataClient extends AbstractLengthPrependerClient
diff --git 
a/common/src/main/java/org/apache/apex/common/util/PropertiesHelper.java 
b/common/src/main/java/org/apache/apex/common/util/PropertiesHelper.java
index aa3d499919..401bb4c8c3 100644
--- a/common/src/main/java/org/apache/apex/common/util/PropertiesHelper.java
+++ b/common/src/main/java/org/apache/apex/common/util/PropertiesHelper.java
@@ -33,25 +33,71 @@
    * @param maxValue maximum valid value
    * @return returns the value if it is between min and max value(inclusive), 
otherwise default value is returned.
    */
-  public static long getLong(String propertyName, long defaultValue, long 
minValue, long maxValue)
+  public static long getLong(final String propertyName, final long 
defaultValue, final long minValue, final long maxValue)
   {
-    String property = System.getProperty(propertyName);
-    long result = defaultValue;
-    if (property != null) {
-      try {
+
+    PropertyReader<Long> propertyReader = new PropertyReader<Long>()
+    {
+      @Override
+      protected Long valueOf(String property)
+      {
         long value = Long.decode(property);
         if (value < minValue || value > maxValue) {
           logger.warn("Property {} is outside the range [{},{}], setting to 
default {}", propertyName, minValue, maxValue, defaultValue);
-        } else {
-          result = value;
+          return null;
+        }
+
+        return value;
+      }
+    };
+
+    return propertyReader.getValue(propertyName, defaultValue);
+  }
+
+  /**
+   * Reading system property as long value.
+   * @param propertyName Name of the system property
+   * @param defaultValue Default value to return in case of an error
+   * @return returns the value if it is valid or return invalid.
+   */
+  public static boolean getBoolean(String propertyName, boolean defaultValue)
+  {
+    PropertyReader<Boolean> propertyReader = new PropertyReader<Boolean>()
+    {
+      @Override
+      protected Boolean valueOf(String property)
+      {
+        return Boolean.valueOf(property);
+      }
+    };
+
+    return propertyReader.getValue(propertyName, defaultValue);
+  }
+
+  public abstract static class PropertyReader<T>
+  {
+    public T getValue(String propertyName, T defaultValue)
+    {
+      String property = System.getProperty(propertyName);
+      T result = defaultValue;
+      if (property != null) {
+        try {
+          T value = valueOf(property);
+          if (value == null) {
+            logger.warn("Property {} is invalid, setting to default {}", 
propertyName, defaultValue);
+          } else {
+            result = value;
+          }
+        } catch (Exception ex) {
+          logger.warn("Can't convert property {} value {} to a long, using 
default {}", propertyName, property, defaultValue, ex);
         }
-      } catch (Exception ex) {
-        logger.warn("Can't convert property {} value {} to a long, using 
default {}", propertyName, property, defaultValue, ex);
       }
+      logger.debug("System property {}'s value is {}", propertyName, result);
+
+      return result;
     }
-    logger.debug("System property {}'s value is {}", propertyName, result);
 
-    return result;
+    protected abstract T valueOf(String property);
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(PropertiesHelper.class);
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java 
b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 41e358e831..f32a2ca1e1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -163,18 +163,22 @@ public ContainerHeartbeatResponse 
processHeartbeat(ContainerHeartbeat msg) throw
      */
     private final AtomicInteger heartbeatCount = new AtomicInteger();
     private final WindowGenerator windowGenerator;
+    private boolean perContainerBufferServer = false;
 
-    public LocalStreamingContainer(String containerId, 
StreamingContainerUmbilicalProtocol umbilical, WindowGenerator winGen)
+    public LocalStreamingContainer(String containerId, 
StreamingContainerUmbilicalProtocol umbilical, WindowGenerator winGen, Server 
server, InetSocketAddress bufferServerAddress, boolean perContainerBufferServer)
     {
       super(containerId, umbilical);
+      this.bufferServerAddress = bufferServerAddress;
+      this.bufferServer = server;
       this.windowGenerator = winGen;
+      this.perContainerBufferServer = perContainerBufferServer;
     }
 
     public void run(StreamingContainerContext ctx) throws Exception
     {
       LOG.debug("container {} context {}", getContainerId(), ctx);
       setup(ctx);
-      if (bufferServerAddress != null && 
!bufferServerAddress.getAddress().isLoopbackAddress()) {
+      if (bufferServerAddress != null && bufferServerAddress.getAddress() != 
null && bufferServerAddress.getAddress().isLoopbackAddress()) {
         bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, 
bufferServerAddress.getPort());
       }
 
@@ -186,6 +190,10 @@ public void run(StreamingContainerContext ctx) throws 
Exception
       } finally {
         // teardown
         try {
+          if (perContainerBufferServer) {
+            bufferServer.stop();
+          }
+          bufferServer = null;
           teardown();
         } catch (Exception e) {
           if (!hasError) {
@@ -250,7 +258,7 @@ private 
LocalStreamingContainerLauncher(ContainerStartRequest cdr, List<Thread>
       if (mockComponentFactory != null) {
         wingen = mockComponentFactory.setupWindowGenerator();
       }
-      this.child = new LocalStreamingContainer(containerId, umbilical, wingen);
+      this.child = new LocalStreamingContainer(containerId, umbilical, wingen, 
perContainerBufferServer ? null : bufferServer, bufferServerAddress, 
perContainerBufferServer);
       ContainerResource cr = new 
ContainerResource(cdr.container.getResourceRequestPriority(), containerId, 
"localhost", cdr.container.getRequiredMemoryMB(), 
cdr.container.getRequiredVCores(), null);
       StreamingContainerAgent sca = dnmgr.assignContainer(cr, 
perContainerBufferServer ? null : bufferServerAddress);
       if (sca != null) {
@@ -477,7 +485,6 @@ public void run(long runMillis)
         bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, 
bufferServer.run().getPort());
         LOG.info("Buffer server started: {}", bufferServerAddress);
       }
-
       long endMillis = System.currentTimeMillis() + runMillis;
 
       while (!appDone) {
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java 
b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index f5aaf352c7..15573f5586 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,6 +42,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.PropertiesHelper;
 import org.apache.apex.log.LogFileInformation;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -116,6 +117,7 @@
 import com.datatorrent.stram.stream.OiOStream;
 import com.datatorrent.stram.stream.PartitionAwareSink;
 import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence;
+import com.datatorrent.stram.stream.QueueServerPublisher;
 import com.datatorrent.stram.util.LoggerUtil;
 
 import net.engio.mbassy.bus.MBassador;
@@ -164,6 +166,7 @@
   private final MBassador<ContainerEvent> eventBus; // event bus for 
publishing container events
   HashSet<Component<ContainerContext>> components;
   private RequestFactory requestFactory;
+  private static boolean publishTuplesFromSocket = 
PropertiesHelper.getBoolean("org.apache.apex.socket.publisher.enable", false);
 
   static {
     try {
@@ -966,7 +969,14 @@ private void deployNodes(List<OperatorDeployInfo> 
nodeList) throws IOException
       bssc.setBufferServerAddress(new 
InetSocketAddress(InetAddress.getByName(null), nodi.bufferServerPort));
     }
 
-    Stream publisher = fastPublisherSubscriber ? new 
FastPublisher(connIdentifier, queueCapacity * 256) : new 
BufferServerPublisher(connIdentifier, queueCapacity);
+    Stream publisher;
+
+    if (publishTuplesFromSocket) {
+      publisher = fastPublisherSubscriber ? new FastPublisher(connIdentifier, 
queueCapacity * 256) : new BufferServerPublisher(connIdentifier, queueCapacity);
+    } else {
+      publisher = new QueueServerPublisher(connIdentifier, bufferServer, 
queueCapacity);
+    }
+
     return new HashMap.SimpleEntry<>(sinkIdentifier, new 
ComponentContextPair<>(publisher, bssc));
   }
 
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/AbstractPublisher.java 
b/engine/src/main/java/com/datatorrent/stram/stream/AbstractPublisher.java
new file mode 100644
index 0000000000..bbabd0f682
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/stream/AbstractPublisher.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.stream;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.operator.ControlTuple;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.bufferserver.packet.BeginWindowTuple;
+import com.datatorrent.bufferserver.packet.DataTuple;
+import com.datatorrent.bufferserver.packet.EndStreamTuple;
+import com.datatorrent.bufferserver.packet.EndWindowTuple;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.bufferserver.packet.PayloadTuple;
+import com.datatorrent.bufferserver.packet.ResetWindowTuple;
+import com.datatorrent.bufferserver.packet.WindowIdTuple;
+import com.datatorrent.netlet.util.Slice;
+import com.datatorrent.stram.codec.StatefulStreamCodec;
+import com.datatorrent.stram.engine.ByteCounterStream;
+import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
+import com.datatorrent.stram.tuple.Tuple;
+
+public abstract class AbstractPublisher implements ByteCounterStream
+{
+  private StreamCodec<Object> serde;
+  private final AtomicLong publishedByteCount;
+  private int count;
+  private StatefulStreamCodec<Object> statefulSerde;
+
+  public AbstractPublisher()
+  {
+    this.publishedByteCount = new AtomicLong(0);
+  }
+
+  @Override
+  public long getByteCount(boolean reset)
+  {
+    if (reset) {
+      return publishedByteCount.getAndSet(0);
+    }
+
+    return publishedByteCount.get();
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void put(Object payload)
+  {
+    count++;
+    byte[] array;
+    if (payload instanceof Tuple) {
+      final Tuple t = (Tuple)payload;
+
+      switch (t.getType()) {
+        case CHECKPOINT:
+          if (statefulSerde != null) {
+            statefulSerde.resetState();
+          }
+          array = WindowIdTuple.getSerializedTuple((int)t.getWindowId());
+          array[0] = MessageType.CHECKPOINT_VALUE;
+          break;
+
+        case BEGIN_WINDOW:
+          array = BeginWindowTuple.getSerializedTuple((int)t.getWindowId());
+          break;
+
+        case END_WINDOW:
+          array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
+          break;
+
+        case END_STREAM:
+          array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
+          break;
+
+        case RESET_WINDOW:
+          com.datatorrent.stram.tuple.ResetWindowTuple rwt = 
(com.datatorrent.stram.tuple.ResetWindowTuple)t;
+          array = ResetWindowTuple.getSerializedTuple(rwt.getBaseSeconds(), 
rwt.getIntervalMillis());
+          break;
+
+        case CUSTOM_CONTROL:
+          if (statefulSerde == null) {
+            array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, 
serde.toByteArray(payload));
+          } else {
+            StatefulStreamCodec.DataStatePair dsp = 
statefulSerde.toDataStatePair(payload);
+            if (dsp.state != null) {
+              array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                  .getSerializedTuple(MessageType.CODEC_STATE_VALUE, 
dsp.state);
+              send(array);
+            }
+            array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, 
dsp.data);
+          }
+          break;
+
+        default:
+          throw new UnsupportedOperationException("this data type is not 
handled in the stream");
+      }
+    } else {
+      if (statefulSerde == null) {
+
+        Slice slice = serde.toByteArray(payload);
+        int partition = serde.getPartition(payload);
+
+        array = PayloadTuple.getSerializedTuple(partition, slice);
+      } else {
+        StatefulStreamCodec.DataStatePair dsp = 
statefulSerde.toDataStatePair(payload);
+        /*
+         * if there is any state write that for the subscriber before we write 
the data.
+         */
+        if (dsp.state != null) {
+          array = DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, 
dsp.state);
+          send(array);
+        }
+        /*
+         * Now that the state if any has been sent, we can proceed with the 
actual data we want to send.
+         */
+
+        array = 
PayloadTuple.getSerializedTuple(statefulSerde.getPartition(payload), dsp.data);
+      }
+    }
+
+    send(array);
+    publishedByteCount.addAndGet(array.length);
+  }
+
+  @Override
+  public int getCount(boolean reset)
+  {
+    try {
+      return count;
+    } finally {
+      if (reset) {
+        count = 0;
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setup(StreamContext context)
+  {
+    StreamCodec<?> codec = context.get(StreamContext.CODEC);
+    if (codec == null) {
+      statefulSerde = 
((StatefulStreamCodec<Object>)StreamContext.CODEC.defaultValue).newInstance();
+    } else if (codec instanceof StatefulStreamCodec) {
+      statefulSerde = ((StatefulStreamCodec<Object>)codec).newInstance();
+    } else {
+      serde = (StreamCodec<Object>)codec;
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
+  public boolean putControl(ControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return false;
+  }
+
+  public abstract void send(byte[] data);
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractPublisher.class);
+}
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index de6aced780..ecacf7baf9 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -19,33 +19,12 @@
 package com.datatorrent.stram.stream;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.apex.api.operator.ControlTuple;
-
-import com.datatorrent.api.StreamCodec;
 import com.datatorrent.bufferserver.client.Publisher;
-import com.datatorrent.bufferserver.packet.BeginWindowTuple;
-import com.datatorrent.bufferserver.packet.DataTuple;
-import com.datatorrent.bufferserver.packet.EndStreamTuple;
-import com.datatorrent.bufferserver.packet.EndWindowTuple;
-import com.datatorrent.bufferserver.packet.MessageType;
-import com.datatorrent.bufferserver.packet.PayloadTuple;
-import com.datatorrent.bufferserver.packet.ResetWindowTuple;
-import com.datatorrent.bufferserver.packet.WindowIdTuple;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.EventLoop;
-import com.datatorrent.stram.codec.StatefulStreamCodec;
-import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair;
-import com.datatorrent.stram.engine.ByteCounterStream;
 import com.datatorrent.stram.engine.StreamContext;
-import com.datatorrent.stram.tuple.CustomControlTuple;
-import com.datatorrent.stram.tuple.Tuple;
-
-import static java.lang.Thread.sleep;
 
 /**
  * Implements tuple flow of node to then buffer server in a logical stream<p>
@@ -57,126 +36,36 @@
  *
  * @since 0.3.2
  */
-public class BufferServerPublisher extends Publisher implements 
ByteCounterStream
+public class BufferServerPublisher extends AbstractPublisher
 {
-  private StreamCodec<Object> serde;
-  private final AtomicLong publishedByteCount;
   private EventLoop eventloop;
-  private int count;
-  private StatefulStreamCodec<Object> statefulSerde;
+  Publisher publisher;
 
   public BufferServerPublisher(String sourceId, int queueCapacity)
   {
-    super(sourceId, queueCapacity);
-    this.publishedByteCount = new AtomicLong(0);
+    super();
+    publisher = new Publisher(sourceId, queueCapacity)
+    {
+      @Override
+      public void onMessage(byte[] buffer, int offset, int size)
+      {
+        throw new RuntimeException("OutputStream is not supposed to receive 
anything!");
+      }
+    };
   }
 
-  /**
-   *
-   * @param payload
-   */
   @Override
-  @SuppressWarnings("SleepWhileInLoop")
-  public void put(Object payload)
+  public void send(byte[] data)
   {
-    count++;
-    byte[] array;
-    if (payload instanceof Tuple) {
-      final Tuple t = (Tuple)payload;
-
-      switch (t.getType()) {
-        case CHECKPOINT:
-          if (statefulSerde != null) {
-            statefulSerde.resetState();
-          }
-          array = WindowIdTuple.getSerializedTuple((int)t.getWindowId());
-          array[0] = MessageType.CHECKPOINT_VALUE;
-          break;
-
-        case BEGIN_WINDOW:
-          array = BeginWindowTuple.getSerializedTuple((int)t.getWindowId());
-          break;
-
-        case END_WINDOW:
-          array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
-          break;
-
-        case CUSTOM_CONTROL:
-          if (statefulSerde == null) {
-            array = com.datatorrent.bufferserver.packet.CustomControlTuple
-                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, 
serde.toByteArray(payload));
-          } else {
-            DataStatePair dsp = statefulSerde.toDataStatePair(payload);
-            if (dsp.state != null) {
-              array = com.datatorrent.bufferserver.packet.CustomControlTuple
-                  .getSerializedTuple(MessageType.CODEC_STATE_VALUE, 
dsp.state);
-              try {
-                while (!write(array)) {
-                  sleep(5);
-                }
-              } catch (InterruptedException ie) {
-                throw new RuntimeException(ie);
-              }
-            }
-            array = com.datatorrent.bufferserver.packet.CustomControlTuple
-                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, 
dsp.data);
-          }
-          break;
-
-        case END_STREAM:
-          array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
-          break;
-
-        case RESET_WINDOW:
-          com.datatorrent.stram.tuple.ResetWindowTuple rwt = 
(com.datatorrent.stram.tuple.ResetWindowTuple)t;
-          array = ResetWindowTuple.getSerializedTuple(rwt.getBaseSeconds(), 
rwt.getIntervalMillis());
-          break;
-
-        default:
-          throw new UnsupportedOperationException("this data type is not 
handled in the stream");
-      }
-    } else {
-      if (statefulSerde == null) {
-        array = PayloadTuple.getSerializedTuple(serde.getPartition(payload), 
serde.toByteArray(payload));
-      } else {
-        DataStatePair dsp = statefulSerde.toDataStatePair(payload);
-        /*
-         * if there is any state write that for the subscriber before we write 
the data.
-         */
-        if (dsp.state != null) {
-          array = DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, 
dsp.state);
-          try {
-            while (!write(array)) {
-              sleep(5);
-            }
-          } catch (InterruptedException ie) {
-            throw new RuntimeException(ie);
-          }
-        }
-        /*
-         * Now that the state if any has been sent, we can proceed with the 
actual data we want to send.
-         */
-        array = 
PayloadTuple.getSerializedTuple(statefulSerde.getPartition(payload), dsp.data);
-      }
-    }
-
     try {
-      while (!write(array)) {
-        sleep(5);
+      while (!publisher.write(data)) {
+        java.lang.Thread.sleep(5);
       }
-      publishedByteCount.addAndGet(array.length);
     } catch (InterruptedException ie) {
       throw new RuntimeException(ie);
     }
   }
 
-  @Override
-  public boolean putControl(ControlTuple payload)
-  {
-    put(new CustomControlTuple(payload));
-    return false;
-  }
-
   /**
    *
    * @param context
@@ -185,67 +74,20 @@ public boolean putControl(ControlTuple payload)
   @SuppressWarnings("unchecked")
   public void activate(StreamContext context)
   {
-    setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
+    publisher.setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
     InetSocketAddress address = context.getBufferServerAddress();
     eventloop = context.get(StreamContext.EVENT_LOOP);
-    eventloop.connect(address.isUnresolved() ? new 
InetSocketAddress(address.getHostName(), address.getPort()) : address, this);
+    eventloop.connect(address.isUnresolved() ? new 
InetSocketAddress(address.getHostName(), address.getPort()) : address, 
publisher);
 
     logger.debug("Registering publisher: {} {} windowId={} server={}", new 
Object[] {context.getSourceId(), context.getId(), 
Codec.getStringWindowId(context.getFinishedWindowId()), 
context.getBufferServerAddress()});
-    super.activate(null, context.getFinishedWindowId());
+    publisher.activate(null, context.getFinishedWindowId());
   }
 
   @Override
   public void deactivate()
   {
-    setToken(null);
-    eventloop.disconnect(this);
-  }
-
-  @Override
-  public void onMessage(byte[] buffer, int offset, int size)
-  {
-    throw new RuntimeException("OutputStream is not supposed to receive 
anything!");
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void setup(StreamContext context)
-  {
-    StreamCodec<?> codec = context.get(StreamContext.CODEC);
-    if (codec == null) {
-      statefulSerde = 
((StatefulStreamCodec<Object>)StreamContext.CODEC.defaultValue).newInstance();
-    } else if (codec instanceof StatefulStreamCodec) {
-      statefulSerde = ((StatefulStreamCodec<Object>)codec).newInstance();
-    } else {
-      serde = (StreamCodec<Object>)codec;
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public long getByteCount(boolean reset)
-  {
-    if (reset) {
-      return publishedByteCount.getAndSet(0);
-    }
-
-    return publishedByteCount.get();
-  }
-
-  @Override
-  public int getCount(boolean reset)
-  {
-    try {
-      return count;
-    } finally {
-      if (reset) {
-        count = 0;
-      }
-    }
+    publisher.setToken(null);
+    eventloop.disconnect(publisher);
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(BufferServerPublisher.class);
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/QueueServerPublisher.java 
b/engine/src/main/java/com/datatorrent/stram/stream/QueueServerPublisher.java
new file mode 100644
index 0000000000..12472add5a
--- /dev/null
+++ 
b/engine/src/main/java/com/datatorrent/stram/stream/QueueServerPublisher.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.datatorrent.bufferserver.server.Server;
+import com.datatorrent.stram.engine.StreamContext;
+
+public class QueueServerPublisher extends AbstractPublisher
+{
+  private Server.QueuePublisher queuePublisher;
+  private Server server;
+  private String identifier;
+  private int queueCapacity;
+
+  public QueueServerPublisher(String sourceId, Server server, int 
queueCapacity)
+  {
+    super();
+    this.server = server;
+    this.identifier = sourceId;
+    this.queueCapacity = queueCapacity;
+  }
+
+  @Override
+  public void activate(StreamContext context)
+  {
+    queuePublisher = 
server.handleQueuePublisher(context.getFinishedWindowId(), identifier, 
queueCapacity);
+  }
+
+  @Override
+  public void send(byte[] data)
+  {
+    queuePublisher.send(data);
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(QueueServerPublisher.class);
+}
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 66f1b84ae6..6f432813ee 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -65,6 +65,7 @@
 import com.datatorrent.stram.stream.BufferServerPublisher;
 import com.datatorrent.stram.stream.BufferServerSubscriber;
 import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.stream.QueueServerPublisher;
 import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
@@ -484,6 +485,128 @@ public void run()
     ((DefaultEventLoop)eventloop).stop();
   }
 
+
+  @Test
+  public void testQueueServer() throws InterruptedException, IOException
+  {
+    final String streamName = "streamName";
+    final String upstreamNodeId = "upstreamNodeId";
+    final String  downstreamNodeId = "downStreamNodeId";
+
+    EventLoop eventloop = 
DefaultEventLoop.createEventLoop("StreamTestEventLoop");
+
+    ((DefaultEventLoop)eventloop).start();
+    final Server bufferServer = new Server(eventloop, 0); // find random port
+    final int bufferServerPort = bufferServer.run().getPort();
+
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+
+    GenericTestOperator go = new GenericTestOperator();
+    final GenericNode gn = new GenericNode(go, new 
com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
+    gn.setId(1);
+
+    Sink<Object> output = new Sink<Object>()
+    {
+      @Override
+      public void put(Object tuple)
+      {
+        tuples.add(tuple);
+      }
+
+      @Override
+      public int getCount(boolean reset)
+      {
+        return 0;
+      }
+    };
+
+    InetSocketAddress socketAddress = new InetSocketAddress("localhost", 
bufferServerPort);
+
+    StreamContext issContext = new StreamContext(streamName);
+    issContext.setSourceId(upstreamNodeId);
+    issContext.setSinkId(downstreamNodeId);
+    issContext.setFinishedWindowId(-1);
+    issContext.setBufferServerAddress(socketAddress);
+    issContext.put(StreamContext.CODEC, serde);
+    issContext.put(StreamContext.EVENT_LOOP, eventloop);
+
+    StreamContext ossContext = new StreamContext(streamName);
+    ossContext.setSourceId(upstreamNodeId);
+    ossContext.setSinkId(downstreamNodeId);
+    ossContext.setBufferServerAddress(socketAddress);
+    ossContext.put(StreamContext.CODEC, serde);
+    ossContext.put(StreamContext.EVENT_LOOP, eventloop);
+
+    QueueServerPublisher queueServerPublisher = new 
QueueServerPublisher(upstreamNodeId, bufferServer, 1024);
+    queueServerPublisher.setup(ossContext);
+    queueServerPublisher.activate(ossContext);
+
+    queueServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 0x1L));
+    byte[] buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)1;
+    queueServerPublisher.put(buff);
+    queueServerPublisher.put(new EndWindowTuple(0x1L));
+    queueServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 0x2L));
+    buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)2;
+    queueServerPublisher.put(buff);
+    queueServerPublisher.put(new EndWindowTuple(0x2L));
+    queueServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 0x3L));
+    buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)3;
+    queueServerPublisher.put(buff);
+
+    queueServerPublisher.put(new EndWindowTuple(0x3L));
+    queueServerPublisher.put(new EndStreamTuple(0L));
+
+    BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 
1024);
+    iss.setup(issContext);
+
+    gn.connectInputPort(GenericTestOperator.IPORT1, 
iss.acquireReservoir("testReservoir", 10));
+    gn.connectOutputPort(GenericTestOperator.OPORT1, output);
+
+    SweepableReservoir tupleWait = iss.acquireReservoir("testReservoir2", 10);
+
+    iss.activate(issContext);
+
+    while (tupleWait.sweep() == null) {
+      Thread.sleep(100);
+    }
+
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
+
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+    };
+
+    t.start();
+    t.join();
+
+    Assert.assertEquals(10, tuples.size());
+
+    List<Object> list = new ArrayList<>(tuples);
+
+    Assert.assertEquals("Payload Tuple 1", 1, ((byte[])list.get(1))[5]);
+    Assert.assertEquals("Payload Tuple 2", 2, ((byte[])list.get(4))[5]);
+    Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]);
+
+    if (bufferServer != null) {
+      eventloop.stop(bufferServer);
+    }
+
+    ((DefaultEventLoop)eventloop).stop();
+  }
+
   @Test
   public void testPrematureTermination() throws InterruptedException
   {
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 15de1773a1..88770793e4 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -208,7 +208,7 @@ public void verify() throws InterruptedException
       }
     }
 
-    eventloop.disconnect(oss);
+    eventloop.disconnect(oss.publisher);
     eventloop.disconnect(iss);
     Assert.assertEquals("Received messages", 1, messageCount.get());
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use BlockingQueue to pass data to the buffer server
> ---------------------------------------------------
>
>                 Key: APEXCORE-259
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-259
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Vlad Rozov
>
> In case buffer server resides in the same container as an operator, tuples 
> should be passed in memory bypassing loopback network adapter



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to