Repository: apex-core
Updated Branches:
  refs/heads/master abc836ca1 -> d80501bdc


APEXCORE-570 Back pressure implementation to suspend publisher when subscriber 
is slow and the buffer fills up


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f2d539d4
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f2d539d4
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f2d539d4

Branch: refs/heads/master
Commit: f2d539d4d88e76c86ea23b803ac38c7a2584a18f
Parents: 7ea7f60
Author: Pramod Immaneni <[email protected]>
Authored: Sat Jan 14 01:33:07 2017 -0800
Committer: Pramod Immaneni <[email protected]>
Committed: Sat Feb 18 07:24:46 2017 +0530

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 52 +++++++++++++++++---
 .../bufferserver/internal/FastDataList.java     |  4 +-
 .../datatorrent/bufferserver/server/Server.java | 10 ++--
 .../bufferserver/storage/DiskStorageTest.java   |  2 +-
 .../datatorrent/stram/StramLocalCluster.java    |  2 +-
 5 files changed, 55 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 3f596d9..3a446b6 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -72,8 +72,9 @@ public class DataList
   private final AtomicInteger numberOfInMemBlockPermits;
   private MutableInt nextOffset = new MutableInt();
   private Future<?> future;
+  private final boolean backPressureEnabled;
 
-  public DataList(final String identifier, final int blockSize, final int 
numberOfCacheBlocks)
+  public DataList(final String identifier, final int blockSize, final int 
numberOfCacheBlocks, final boolean backPressureEnabled)
   {
     if (numberOfCacheBlocks < 1) {
       throw new IllegalArgumentException("Invalid number of Data List Memory 
blocks " + numberOfCacheBlocks);
@@ -82,6 +83,7 @@ public class DataList
     numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 
1);
     this.identifier = identifier;
     this.blockSize = blockSize;
+    this.backPressureEnabled = backPressureEnabled;
     first = last = new Block(identifier, blockSize);
   }
 
@@ -91,7 +93,7 @@ public class DataList
      * We use 64MB (the default HDFS block getSize) as the getSize of the 
memory pool so we can flush the data 1 block
      * at a time to the filesystem. We will use default value of 8 block sizes 
to be cached in memory
      */
-    this(identifier, 64 * 1024 * 1024, 8);
+    this(identifier, 64 * 1024 * 1024, 8, true);
   }
 
   public int getBlockSize()
@@ -172,6 +174,7 @@ public class DataList
         }
       }
       first = last;
+      first.prev = null;
     }
     numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1);
   }
@@ -188,6 +191,7 @@ public class DataList
         if (temp.ending_window > windowId || temp == last) {
           if (prev != null) {
             first = temp;
+            first.prev = null;
           }
           first.purge(windowId);
           break;
@@ -436,7 +440,8 @@ public class DataList
       logger.warn("Exceeded allowed memory block allocation by {}", 
-numberOfInMemBlockPermits);
     }
     last.next = new Block(identifier, array, last.ending_window, 
last.ending_window);
-    last.release(false);
+    last.next.prev = last;
+    last.release(false, true);
     last = last.next;
   }
 
@@ -554,6 +559,10 @@ public class DataList
      */
     Block next;
     /**
+     * the previous in the chain
+     */
+    Block prev;
+    /**
      * how count of references to this block.
      */
     private final AtomicInteger refCount;
@@ -822,10 +831,10 @@ public class DataList
       };
     }
 
-    protected void release(boolean wait)
+    protected void release(boolean wait, boolean writer)
     {
       final int refCount = this.refCount.decrementAndGet();
-      if (refCount == 0 && storage != null) {
+      if (canEvict(refCount, writer)) {
         assert (next != null);
         final Runnable storer = getStorer(data, readingOffset, writingOffset, 
storage);
         if (future != null && future.cancel(false)) {
@@ -840,8 +849,37 @@ public class DataList
         } else {
           future = null;
         }
+      }
+    }
+
+    private boolean canEvict(final int refCount, boolean writer)
+    {
+      if (refCount == 0 && storage != null) {
+        if (backPressureEnabled) {
+          if (!writer) {
+            boolean evict = true;
+            int blocks = 0;
+            // Search backwards from current block as opposed to searching 
forward from first block till current block as
+
+            // it is more likely to find the match quicker. No need to search 
more than maximum number of in memory
+            // permits.
+            for (Block temp = this.prev; (blocks < (MAX_COUNT_OF_INMEM_BLOCKS 
- 1)) && (temp != null); temp = temp.prev, ++blocks) {
+              if (temp.refCount.get() != 0) {
+                evict = false;
+                break;
+              }
+            }
+            logger.debug("Block {} evict {}", this, evict);
+            return evict;
+          } else {
+            return false;
+          }
+        } else {
+          return true;
+        }
       } else {
         logger.debug("Holding {} in memory due to {} references.", this, 
refCount);
+        return false;
       }
     }
 
@@ -937,7 +975,7 @@ public class DataList
       }
       //logger.debug("{}: switching to the next block {}->{}", this, da, 
da.next);
       next.acquire(true);
-      da.release(false);
+      da.release(false, false);
       da = next;
       size = 0;
       buffer = da.data;
@@ -1008,7 +1046,7 @@ public class DataList
     public void close()
     {
       if (da != null) {
-        da.release(false);
+        da.release(false, false);
         da = null;
         buffer = null;
       }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index 9baf111..2e75893 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -37,9 +37,9 @@ public class FastDataList extends DataList
     super(identifier);
   }
 
-  public FastDataList(String identifier, int blocksize, int 
numberOfCacheBlocks)
+  public FastDataList(String identifier, int blocksize, int 
numberOfCacheBlocks, boolean backPressureEnabled)
   {
-    super(identifier, blocksize, numberOfCacheBlocks);
+    super(identifier, blocksize, numberOfCacheBlocks, backPressureEnabled);
   }
 
   long item;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index af55143..f819bb0 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -76,6 +76,8 @@ public class Server implements ServerListener
 
   private byte[] authToken;
 
+  private static final boolean BACK_PRESSURE_ENABLED = 
!Boolean.getBoolean("org.apache.apex.bufferserver.backpressure.disable");
+
   /**
    * @param port - port number to bind to or 0 to auto select a free port
    */
@@ -267,8 +269,8 @@ public class Server implements ServerListener
           DataList dl = publisherBuffers.get(upstream_identifier);
           if (dl == null) {
             dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
-                new FastDataList(upstream_identifier, blockSize, 
numberOfCacheBlocks) :
-                new DataList(upstream_identifier, blockSize, 
numberOfCacheBlocks);
+                new FastDataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED) :
+                new DataList(upstream_identifier, blockSize, 
numberOfCacheBlocks, BACK_PRESSURE_ENABLED);
             DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, 
dl);
             if (odl != null) {
               dl = odl;
@@ -387,8 +389,8 @@ public class Server implements ServerListener
       }
     } else {
       dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
-          new FastDataList(identifier, blockSize, numberOfCacheBlocks) :
-          new DataList(identifier, blockSize, numberOfCacheBlocks);
+          new FastDataList(identifier, blockSize, numberOfCacheBlocks, 
BACK_PRESSURE_ENABLED) :
+          new DataList(identifier, blockSize, numberOfCacheBlocks, 
BACK_PRESSURE_ENABLED);
       DataList odl = publisherBuffers.putIfAbsent(identifier, dl);
       if (odl != null) {
         dl = odl;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
index 755298a..86696f4 100644
--- 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
+++ 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
@@ -60,7 +60,7 @@ public class DiskStorageTest
     eventloopClient = DefaultEventLoop.createEventLoop("client");
     eventloopClient.start();
 
-    instance = new Server(0, 1024,8);
+    instance = new Server(0, 1024, 8);
     instance.setSpoolStorage(new DiskStorage());
 
     address = instance.run(eventloopServer);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java 
b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index e188b60..4eed0de 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -446,7 +446,7 @@ public class StramLocalCluster implements Runnable, 
Controller
   {
     if (!perContainerBufferServer) {
       StreamingContainer.eventloop.start();
-      bufferServer = new Server(0, 1024 * 1024,8);
+      bufferServer = new Server(0, 1024 * 1024, 8);
       try {
         bufferServer.setSpoolStorage(new DiskStorage());
       } catch (IOException e) {

Reply via email to