Repository: apex-core
Updated Branches:
  refs/heads/master f6e6672fa -> d55a3c592


APEXCORE-705 Backpressure when spooling is disabled. The publisher is suspended 
if ahead of subscriber by maximum number of blocks.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d55a3c59
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d55a3c59
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d55a3c59

Branch: refs/heads/master
Commit: d55a3c592afa1221ab88dd043e414895a00c6413
Parents: f6e6672
Author: Pramod Immaneni <[email protected]>
Authored: Mon Apr 10 11:48:31 2017 -0700
Committer: Pramod Immaneni <[email protected]>
Committed: Mon Jun 26 18:54:42 2017 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 122 ++++++++++++++-----
 .../datatorrent/bufferserver/server/Server.java |   2 +-
 2 files changed, 93 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d55a3c59/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index d08b9fc..5813b56 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -390,12 +390,7 @@ public class DataList
   {
     boolean resumedSuspendedClients = false;
     if (numberOfInMemBlockPermits > 0) {
-      synchronized (suspendedClients) {
-        for (AbstractClient client : suspendedClients) {
-          resumedSuspendedClients |= client.resumeReadIfSuspended();
-        }
-        suspendedClients.clear();
-      }
+      resumedSuspendedClients = resumeSuspendedClients();
     } else {
       logger.debug("Keeping clients: {} suspended, 
numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients,
           numberOfInMemBlockPermits, all_listeners);
@@ -403,9 +398,46 @@ public class DataList
     return resumedSuspendedClients;
   }
 
+  private boolean resumeSuspendedClients()
+  {
+    boolean resumedSuspendedClients = false;
+    synchronized (suspendedClients) {
+      for (AbstractClient client : suspendedClients) {
+        resumedSuspendedClients |= client.resumeReadIfSuspended();
+      }
+      suspendedClients.clear();
+    }
+    return resumedSuspendedClients;
+  }
+
   public boolean isMemoryBlockAvailable()
   {
-    return (storage == null) || (numberOfInMemBlockPermits.get() > 0);
+    return (numberOfInMemBlockPermits.get() > 0);
+  }
+
+  public boolean areSubscribersBehindByMax()
+  {
+    boolean behind = false;
+    if (backPressureEnabled) {
+      // Seek to max blocks and see if any block is in use beyond that
+      int count = 0;
+      synchronized (this) {
+        Block curr = last.prev;
+        // go back the max number of blocks
+        while ((curr != null) && (++count < (MAX_COUNT_OF_INMEM_BLOCKS - 2))) {
+          curr = curr.prev;
+        }
+        // check if any block is in use
+        while (!behind && (curr != null)) {
+          // Since acquire happens before release, because of concurrency, in 
a corner case scenario we might still count a
+          // subscriber as being max behind when it is transitioning over to 
the next block but that is ok as it will only
+          // result in publisher blocking for some time and resuming
+          behind = (curr.refCount.get() != 0);
+          curr = curr.prev;
+        }
+      }
+    }
+    return behind;
   }
 
   public byte[] newBuffer(final int size)
@@ -821,37 +853,47 @@ public class DataList
       final int refCount = this.refCount.decrementAndGet();
       if (canEvict(refCount, writer)) {
         assert (next != null);
-        final Runnable storer = getStorer(data, readingOffset, writingOffset, 
storage);
-        if (future != null && future.cancel(false)) {
-          logger.debug("Block {} future is cancelled", this);
-        }
-        final int numberOfInMemBlockPermits = 
DataList.this.numberOfInMemBlockPermits.get();
-        if (wait && numberOfInMemBlockPermits == 0) {
-          future = null;
-          storer.run();
-        } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) {
-          future = storageExecutor.submit(storer);
+        if (storage != null) {
+          evictBlock(wait);
         } else {
-          future = null;
+          if (!isPublisherAheadByMax()) {
+            resumeSuspendedClients();
+          }
         }
       }
     }
 
+    private void evictBlock(boolean wait)
+    {
+      final Runnable storer = getStorer(data, readingOffset, writingOffset, 
storage);
+      if (future != null && future.cancel(false)) {
+        logger.debug("Block {} future is cancelled", this);
+      }
+      final int numberOfInMemBlockPermits = 
DataList.this.numberOfInMemBlockPermits.get();
+      if (wait && numberOfInMemBlockPermits == 0) {
+        future = null;
+        storer.run();
+      } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) {
+        future = storageExecutor.submit(storer);
+      } else {
+        future = null;
+      }
+    }
+
     private boolean canEvict(final int refCount, boolean writer)
     {
-      if (refCount == 0 && storage != null) {
+      if (refCount == 0) {
         if (backPressureEnabled) {
           if (!writer) {
             boolean evict = true;
-            int blocks = 0;
             // Search backwards from current block as opposed to searching 
forward from first block till current block as
-
-            // it is more likely to find the match quicker. No need to search 
more than maximum number of in memory
-            // permits.
-            for (Block temp = this.prev; (blocks < (MAX_COUNT_OF_INMEM_BLOCKS 
- 1)) && (temp != null); temp = temp.prev, ++blocks) {
-              if (temp.refCount.get() != 0) {
-                evict = false;
-                break;
+            // it is more likely to find the match quicker.
+            synchronized (this) {
+              for (Block temp = this.prev; temp != null; temp = temp.prev) {
+                if (temp.refCount.get() != 0) {
+                  evict = false;
+                  break;
+                }
               }
             }
             logger.debug("Block {} evict {}", this, evict);
@@ -862,10 +904,30 @@ public class DataList
         } else {
           return true;
         }
-      } else {
-        logger.debug("Holding {} in memory due to {} references.", this, 
refCount);
-        return false;
       }
+      logger.debug("Not evicting {} due to {} references.", this, refCount);
+      return false;
+    }
+
+    private boolean isPublisherAheadByMax()
+    {
+      boolean ahead = false;
+      if (backPressureEnabled) {
+        int blocks = MAX_COUNT_OF_INMEM_BLOCKS;
+        synchronized (DataList.this) {
+          Block curr = this.next;
+          // seek till the next block that is in use to determine possible 
active subscriber
+          while ((curr != null) && (curr.refCount.get() == 0)) {
+            curr = curr.next;
+          }
+          // find if publisher is ahead by max
+          while (!ahead && (curr != null)) {
+            ahead = (--blocks == 0);
+            curr = curr.next;
+          }
+        }
+      }
+      return ahead;
     }
 
     private Runnable getDiscarder()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d55a3c59/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 8a56b51..857e51e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -807,7 +807,7 @@ public class Server extends AbstractServer
 
     private boolean switchToNewBuffer(final byte[] array, final int offset, 
final int size)
     {
-      if (datalist.isMemoryBlockAvailable()) {
+      if ((datalist.isMemoryBlockAvailable() || ((storage == null)) && 
!datalist.areSubscribersBehindByMax())) {
         final byte[] newBuffer = datalist.newBuffer(size);
         byteBuffer = ByteBuffer.wrap(newBuffer);
         if (array == null || array.length - offset == 0) {

Reply via email to