Repository: apex-core Updated Branches: refs/heads/master abc836ca1 -> d80501bdc
APEXCORE-570 Back pressure implementation to suspend publisher when subscriber is slow and the buffer fills up Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f2d539d4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f2d539d4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f2d539d4 Branch: refs/heads/master Commit: f2d539d4d88e76c86ea23b803ac38c7a2584a18f Parents: 7ea7f60 Author: Pramod Immaneni <[email protected]> Authored: Sat Jan 14 01:33:07 2017 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Sat Feb 18 07:24:46 2017 +0530 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 52 +++++++++++++++++--- .../bufferserver/internal/FastDataList.java | 4 +- .../datatorrent/bufferserver/server/Server.java | 10 ++-- .../bufferserver/storage/DiskStorageTest.java | 2 +- .../datatorrent/stram/StramLocalCluster.java | 2 +- 5 files changed, 55 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 3f596d9..3a446b6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -72,8 +72,9 @@ public class DataList private final AtomicInteger numberOfInMemBlockPermits; private MutableInt nextOffset = new MutableInt(); private Future<?> future; + private final boolean backPressureEnabled; - public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks) + public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled) { if (numberOfCacheBlocks < 1) { throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks); @@ -82,6 +83,7 @@ public class DataList numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1); this.identifier = identifier; this.blockSize = blockSize; + this.backPressureEnabled = backPressureEnabled; first = last = new Block(identifier, blockSize); } @@ -91,7 +93,7 @@ public class DataList * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory */ - this(identifier, 64 * 1024 * 1024, 8); + this(identifier, 64 * 1024 * 1024, 8, true); } public int getBlockSize() @@ -172,6 +174,7 @@ public class DataList } } first = last; + first.prev = null; } numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1); } @@ -188,6 +191,7 @@ public class DataList if (temp.ending_window > windowId || temp == last) { if (prev != null) { first = temp; + first.prev = null; } first.purge(windowId); break; @@ -436,7 +440,8 @@ public class DataList logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); } last.next = new Block(identifier, array, last.ending_window, last.ending_window); - last.release(false); + last.next.prev = last; + last.release(false, true); last = last.next; } @@ -554,6 +559,10 @@ public class DataList */ Block next; /** + * the previous in the chain + */ + Block prev; + /** * how count of references to this block. */ private final AtomicInteger refCount; @@ -822,10 +831,10 @@ public class DataList }; } - protected void release(boolean wait) + protected void release(boolean wait, boolean writer) { final int refCount = this.refCount.decrementAndGet(); - if (refCount == 0 && storage != null) { + if (canEvict(refCount, writer)) { assert (next != null); final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); if (future != null && future.cancel(false)) { @@ -840,8 +849,37 @@ public class DataList } else { future = null; } + } + } + + private boolean canEvict(final int refCount, boolean writer) + { + if (refCount == 0 && storage != null) { + if (backPressureEnabled) { + if (!writer) { + boolean evict = true; + int blocks = 0; + // Search backwards from current block as opposed to searching forward from first block till current block as + + // it is more likely to find the match quicker. No need to search more than maximum number of in memory + // permits. + for (Block temp = this.prev; (blocks < (MAX_COUNT_OF_INMEM_BLOCKS - 1)) && (temp != null); temp = temp.prev, ++blocks) { + if (temp.refCount.get() != 0) { + evict = false; + break; + } + } + logger.debug("Block {} evict {}", this, evict); + return evict; + } else { + return false; + } + } else { + return true; + } } else { logger.debug("Holding {} in memory due to {} references.", this, refCount); + return false; } } @@ -937,7 +975,7 @@ public class DataList } //logger.debug("{}: switching to the next block {}->{}", this, da, da.next); next.acquire(true); - da.release(false); + da.release(false, false); da = next; size = 0; buffer = da.data; @@ -1008,7 +1046,7 @@ public class DataList public void close() { if (da != null) { - da.release(false); + da.release(false, false); da = null; buffer = null; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index 9baf111..2e75893 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -37,9 +37,9 @@ public class FastDataList extends DataList super(identifier); } - public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks) + public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks, boolean backPressureEnabled) { - super(identifier, blocksize, numberOfCacheBlocks); + super(identifier, blocksize, numberOfCacheBlocks, backPressureEnabled); } long item; http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index af55143..f819bb0 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -76,6 +76,8 @@ public class Server implements ServerListener private byte[] authToken; + private static final boolean BACK_PRESSURE_ENABLED = !Boolean.getBoolean("org.apache.apex.bufferserver.backpressure.disable"); + /** * @param port - port number to bind to or 0 to auto select a free port */ @@ -267,8 +269,8 @@ public class Server implements ServerListener DataList dl = publisherBuffers.get(upstream_identifier); if (dl == null) { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? - new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : - new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); + new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED) : + new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED); DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, dl); if (odl != null) { dl = odl; @@ -387,8 +389,8 @@ public class Server implements ServerListener } } else { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? - new FastDataList(identifier, blockSize, numberOfCacheBlocks) : - new DataList(identifier, blockSize, numberOfCacheBlocks); + new FastDataList(identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED) : + new DataList(identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED); DataList odl = publisherBuffers.putIfAbsent(identifier, dl); if (odl != null) { dl = odl; http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java index 755298a..86696f4 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java @@ -60,7 +60,7 @@ public class DiskStorageTest eventloopClient = DefaultEventLoop.createEventLoop("client"); eventloopClient.start(); - instance = new Server(0, 1024,8); + instance = new Server(0, 1024, 8); instance.setSpoolStorage(new DiskStorage()); address = instance.run(eventloopServer); http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index e188b60..4eed0de 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -446,7 +446,7 @@ public class StramLocalCluster implements Runnable, Controller { if (!perContainerBufferServer) { StreamingContainer.eventloop.start(); - bufferServer = new Server(0, 1024 * 1024,8); + bufferServer = new Server(0, 1024 * 1024, 8); try { bufferServer.setSpoolStorage(new DiskStorage()); } catch (IOException e) {
