Repository: apex-core Updated Branches: refs/heads/master f6e6672fa -> d55a3c592
APEXCORE-705 Backpressure when spooling is disabled. The publisher is suspended if ahead of subscriber by maximum number of blocks. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d55a3c59 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d55a3c59 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d55a3c59 Branch: refs/heads/master Commit: d55a3c592afa1221ab88dd043e414895a00c6413 Parents: f6e6672 Author: Pramod Immaneni <[email protected]> Authored: Mon Apr 10 11:48:31 2017 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon Jun 26 18:54:42 2017 -0700 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 122 ++++++++++++++----- .../datatorrent/bufferserver/server/Server.java | 2 +- 2 files changed, 93 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d55a3c59/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index d08b9fc..5813b56 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -390,12 +390,7 @@ public class DataList { boolean resumedSuspendedClients = false; if (numberOfInMemBlockPermits > 0) { - synchronized (suspendedClients) { - for (AbstractClient client : suspendedClients) { - resumedSuspendedClients |= client.resumeReadIfSuspended(); - } - suspendedClients.clear(); - } + resumedSuspendedClients = resumeSuspendedClients(); } else { logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners); @@ -403,9 +398,46 @@ public class DataList return resumedSuspendedClients; } + private boolean resumeSuspendedClients() + { + boolean resumedSuspendedClients = false; + synchronized (suspendedClients) { + for (AbstractClient client : suspendedClients) { + resumedSuspendedClients |= client.resumeReadIfSuspended(); + } + suspendedClients.clear(); + } + return resumedSuspendedClients; + } + public boolean isMemoryBlockAvailable() { - return (storage == null) || (numberOfInMemBlockPermits.get() > 0); + return (numberOfInMemBlockPermits.get() > 0); + } + + public boolean areSubscribersBehindByMax() + { + boolean behind = false; + if (backPressureEnabled) { + // Seek to max blocks and see if any block is in use beyond that + int count = 0; + synchronized (this) { + Block curr = last.prev; + // go back the max number of blocks + while ((curr != null) && (++count < (MAX_COUNT_OF_INMEM_BLOCKS - 2))) { + curr = curr.prev; + } + // check if any block is in use + while (!behind && (curr != null)) { + // Since acquire happens before release, because of concurrency, in a corner case scenario we might still count a + // subscriber as being max behind when it is transitioning over to the next block but that is ok as it will only + // result in publisher blocking for some time and resuming + behind = (curr.refCount.get() != 0); + curr = curr.prev; + } + } + } + return behind; } public byte[] newBuffer(final int size) @@ -821,37 +853,47 @@ public class DataList final int refCount = this.refCount.decrementAndGet(); if (canEvict(refCount, writer)) { assert (next != null); - final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); - if (future != null && future.cancel(false)) { - logger.debug("Block {} future is cancelled", this); - } - final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get(); - if (wait && numberOfInMemBlockPermits == 0) { - future = null; - storer.run(); - } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) { - future = storageExecutor.submit(storer); + if (storage != null) { + evictBlock(wait); } else { - future = null; + if (!isPublisherAheadByMax()) { + resumeSuspendedClients(); + } } } } + private void evictBlock(boolean wait) + { + final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); + if (future != null && future.cancel(false)) { + logger.debug("Block {} future is cancelled", this); + } + final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get(); + if (wait && numberOfInMemBlockPermits == 0) { + future = null; + storer.run(); + } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) { + future = storageExecutor.submit(storer); + } else { + future = null; + } + } + private boolean canEvict(final int refCount, boolean writer) { - if (refCount == 0 && storage != null) { + if (refCount == 0) { if (backPressureEnabled) { if (!writer) { boolean evict = true; - int blocks = 0; // Search backwards from current block as opposed to searching forward from first block till current block as - - // it is more likely to find the match quicker. No need to search more than maximum number of in memory - // permits. - for (Block temp = this.prev; (blocks < (MAX_COUNT_OF_INMEM_BLOCKS - 1)) && (temp != null); temp = temp.prev, ++blocks) { - if (temp.refCount.get() != 0) { - evict = false; - break; + // it is more likely to find the match quicker. + synchronized (this) { + for (Block temp = this.prev; temp != null; temp = temp.prev) { + if (temp.refCount.get() != 0) { + evict = false; + break; + } } } logger.debug("Block {} evict {}", this, evict); @@ -862,10 +904,30 @@ public class DataList } else { return true; } - } else { - logger.debug("Holding {} in memory due to {} references.", this, refCount); - return false; } + logger.debug("Not evicting {} due to {} references.", this, refCount); + return false; + } + + private boolean isPublisherAheadByMax() + { + boolean ahead = false; + if (backPressureEnabled) { + int blocks = MAX_COUNT_OF_INMEM_BLOCKS; + synchronized (DataList.this) { + Block curr = this.next; + // seek till the next block that is in use to determine possible active subscriber + while ((curr != null) && (curr.refCount.get() == 0)) { + curr = curr.next; + } + // find if publisher is ahead by max + while (!ahead && (curr != null)) { + ahead = (--blocks == 0); + curr = curr.next; + } + } + } + return ahead; } private Runnable getDiscarder() http://git-wip-us.apache.org/repos/asf/apex-core/blob/d55a3c59/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 8a56b51..857e51e 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -807,7 +807,7 @@ public class Server extends AbstractServer private boolean switchToNewBuffer(final byte[] array, final int offset, final int size) { - if (datalist.isMemoryBlockAvailable()) { + if ((datalist.isMemoryBlockAvailable() || ((storage == null)) && !datalist.areSubscribersBehindByMax())) { final byte[] newBuffer = datalist.newBuffer(size); byteBuffer = ByteBuffer.wrap(newBuffer); if (array == null || array.length - offset == 0) {
