APEX-68: Buffer server should use a separate thread to spool blocks to disk
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1b8aecf3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1b8aecf3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1b8aecf3 Branch: refs/heads/feature-module Commit: 1b8aecf3b429069b92a790a4aae2e41490934de7 Parents: f2a4071 Author: Vlad Rozov <[email protected]> Authored: Wed Sep 9 15:41:30 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Wed Sep 9 15:41:30 2015 -0700 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 598 +++++++++++-------- .../bufferserver/internal/FastDataList.java | 64 +- .../datatorrent/bufferserver/server/Server.java | 101 +++- 3 files changed, 425 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index baa052a..6806168 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +34,13 @@ import com.datatorrent.bufferserver.util.BitVector; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.bufferserver.util.SerializedData; import com.datatorrent.bufferserver.util.VarInt; +import com.datatorrent.netlet.AbstractClient; import com.datatorrent.netlet.util.VarInt.MutableInt; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; + /** * Maintains list of data and manages addition and deletion of the data<p> * <br> @@ -44,83 +51,137 @@ public class DataList { private final int MAX_COUNT_OF_INMEM_BLOCKS; protected final String identifier; - private final Integer blocksize; - private HashMap<BitVector, HashSet<DataListener>> listeners = new HashMap<BitVector, HashSet<DataListener>>(); - protected HashSet<DataListener> all_listeners = new HashSet<DataListener>(); + private final int blockSize; + private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap(); + protected final HashSet<DataListener> all_listeners = newHashSet(); + protected final HashMap<String, DataListIterator> iterators = newHashMap(); protected Block first; protected Block last; protected Storage storage; - protected ExecutorService autoflushExecutor; + protected ExecutorService autoFlushExecutor; protected ExecutorService storageExecutor; + protected int size; + protected int processingOffset; + protected long baseSeconds; + private final List<AbstractClient> suspendedClients = newArrayList(); + private final AtomicInteger numberOfInMemBlockPermits; + private MutableInt nextOffset = new MutableInt(); + + public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks) + { + if (numberOfCacheBlocks < 1) { + throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks); + } + this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks; + numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1); + this.identifier = identifier; + this.blockSize = blockSize; + first = last = new Block(identifier, blockSize); + } - public int getBlockSize() + public DataList(String identifier) { - return blocksize; + /* + * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem. + * we will use default value of 8 block sizes to be cached in memory + */ + this(identifier, 64 * 1024 * 1024, 8); } - public void rewind(int baseSeconds, int windowId) throws IOException + public int getBlockSize() { - long longWindowId = (long)baseSeconds << 32 | windowId; + return blockSize; + } - for (Block temp = first; temp != null; temp = temp.next) { + public void rewind(final int baseSeconds, final int windowId) throws IOException + { + final long longWindowId = (long)baseSeconds << 32 | windowId; + logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId)); - if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) { - if (temp != last) { - temp.next = null; - last = temp; + int numberOfInMemBlockRewound = 0; + synchronized (this) { + for (Block temp = first; temp != null; temp = temp.next) { + if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) { + if (temp != last) { + last = temp; + do { + temp = temp.next; + temp.discard(false); + if (temp.data != null) { + temp.data = null; + numberOfInMemBlockRewound++; + } + } while (temp.next != null); + last.next = null; + last.acquire(true); + } + this.baseSeconds = last.rewind(longWindowId); + processingOffset = last.writingOffset; + size = 0; + break; } - - this.baseSeconds = temp.rewind(longWindowId); - processingOffset = temp.writingOffset; - size = 0; } } - for (DataListIterator dli : iterators.values()) { - dli.rewind(processingOffset); - } + /* + TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block. + */ + + final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound); + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + resumeSuspendedClients(numberOfInMemBlockPermits); + logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this); + } public void reset() { + logger.debug("Resetting {}", this); listeners.clear(); all_listeners.clear(); - if (storage != null) { - while (first != null) { - if (first.uniqueIdentifier > 0) { - logger.debug("discarding {} {} in reset", identifier, first.uniqueIdentifier); - storage.discard(identifier, first.uniqueIdentifier); + synchronized (this) { + if (storage != null) { + Block temp = first; + while (temp != last) { + temp.discard(false); + temp.data = null; + temp = temp.next; } - first = first.next; } + first = last; } + numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1); } - public void purge(int baseSeconds, int windowId) + public void purge(final int baseSeconds, final int windowId) { - long longWindowId = (long)baseSeconds << 32 | windowId; - logger.debug("purge request for windowId {}", Codec.getStringWindowId(longWindowId)); - - Block prev = null; - for (Block temp = first; temp != null && temp.starting_window <= longWindowId; temp = temp.next) { - if (temp.ending_window > longWindowId || temp == last) { - if (prev != null) { - first = temp; + final long longWindowId = (long)baseSeconds << 32 | windowId; + logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId)); + + int numberOfInMemBlockPurged = 0; + synchronized (this) { + for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) { + if (temp.ending_window > longWindowId || temp == last) { + if (prev != null) { + first = temp; + } + first.purge(longWindowId); + break; + } + temp.discard(false); + if (temp.data != null) { + temp.data = null; + numberOfInMemBlockPurged++; } - - first.purge(longWindowId); - break; } + } - if (storage != null && temp.uniqueIdentifier > 0) { - logger.debug("discarding {} {} in purge", identifier, temp.uniqueIdentifier); - - storage.discard(identifier, temp.uniqueIdentifier); - } + final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged); + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + resumeSuspendedClients(numberOfInMemBlockPermits); + logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this); - prev = temp; - } } /** @@ -131,35 +192,6 @@ public class DataList return identifier; } - public DataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount) - { - this(identifier, blocksize, numberOfCacheBlocks); - first.refCount = refCount; - } - - public DataList(String identifier, int blocksize, int numberOfCacheBlocks) - { - this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks; - this.identifier = identifier; - this.blocksize = blocksize; - first = new Block(identifier, blocksize); - last = first; - } - - public DataList(String identifier) - { - /* - * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem. - * we will use default value of 8 block sizes to be cached in memory - */ - this(identifier, 64 * 1024 * 1024, 8); - } - - MutableInt nextOffset = new MutableInt(); - long baseSeconds; - int size; - int processingOffset; - public void flush(final int writeOffset) { //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset); @@ -195,8 +227,7 @@ public class DataList last.starting_window = baseSeconds | bwt.getWindowId(); last.ending_window = last.starting_window; //logger.debug("assigned both window id {}", last); - } - else { + } else { last.ending_window = baseSeconds | bwt.getWindowId(); //logger.debug("assigned last window id {}", last); } @@ -209,8 +240,7 @@ public class DataList } processingOffset += size; size = 0; - } - else { + } else { if (writeOffset == last.data.length) { nextOffset.integer = 0; processingOffset = 0; @@ -218,12 +248,11 @@ public class DataList } break; } - } - while (true); + } while (true); last.writingOffset = writeOffset; - autoflushExecutor.submit(new Runnable() + autoFlushExecutor.submit(new Runnable() { @Override public void run() @@ -236,9 +265,9 @@ public class DataList }); } - public void setAutoflushExecutor(final ExecutorService es) + public void setAutoFlushExecutor(final ExecutorService es) { - autoflushExecutor = es; + autoFlushExecutor = es; } public void setSecondaryStorage(Storage storage, ExecutorService es) @@ -250,13 +279,16 @@ public class DataList /* * Iterator related functions. */ - protected final HashMap<String, DataListIterator> iterators = new HashMap<String, DataListIterator>(); - - public DataListIterator getIterator(Block block) + protected DataListIterator getIterator(final Block block) { return new DataListIterator(block); } + private synchronized Block getNextBlock(final Block block) + { + return block.next; + } + public Iterator<SerializedData> newIterator(String identifier, long windowId) { //logger.debug("request for a new iterator {} and {}", identifier, windowId); @@ -283,21 +315,17 @@ public class DataList */ public boolean delIterator(Iterator<SerializedData> iterator) { - boolean released = false; if (iterator instanceof DataListIterator) { DataListIterator dli = (DataListIterator)iterator; for (Entry<String, DataListIterator> e : iterators.entrySet()) { if (e.getValue() == dli) { - if (dli.da != null) { - dli.da.release(false); - } + dli.close(); iterators.remove(e.getKey()); - released = true; - break; + return true; } } } - return released; + return false; } public void addDataListener(DataListener dl) @@ -310,20 +338,17 @@ public class DataList HashSet<DataListener> set; if (listeners.containsKey(partition)) { set = listeners.get(partition); - } - else { + } else { set = new HashSet<DataListener>(); listeners.put(partition, set); } set.add(dl); } - } - else { + } else { HashSet<DataListener> set; if (listeners.containsKey(DataListener.NULL_PARTITION)) { set = listeners.get(DataListener.NULL_PARTITION); - } - else { + } else { set = new HashSet<DataListener>(); listeners.put(DataListener.NULL_PARTITION, set); } @@ -341,8 +366,7 @@ public class DataList listeners.get(partition).remove(dl); } } - } - else { + } else { if (listeners.containsKey(DataListener.NULL_PARTITION)) { listeners.get(DataListener.NULL_PARTITION).remove(dl); } @@ -351,43 +375,46 @@ public class DataList all_listeners.remove(dl); } - public void addBuffer(byte[] array) + public boolean suspendRead(final AbstractClient client) { - last.next = new Block(identifier, array); - last.next.starting_window = last.ending_window; - last.next.ending_window = last.ending_window; - last = last.next; - - //logger.debug("addbuffer last = {}", last); - int inmemBlockCount; + synchronized (suspendedClients) { + return client.suspendReadIfResumed() && suspendedClients.add(client); + } + } - inmemBlockCount = 0; - for (Block temp = first; temp != null; temp = temp.next) { - if (temp.data != null) { - inmemBlockCount++; + public boolean resumeSuspendedClients(final int numberOfInMemBlockPermits) + { + boolean resumedSuspendedClients = false; + if (numberOfInMemBlockPermits > 0) { + synchronized (suspendedClients) { + for (AbstractClient client : suspendedClients) { + resumedSuspendedClients |= client.resumeReadIfSuspended(); + } + suspendedClients.clear(); } } + return resumedSuspendedClients; + } - if (inmemBlockCount >= MAX_COUNT_OF_INMEM_BLOCKS) { - //logger.debug("InmemBlockCount before releaes {}", inmemBlockCount); - for (Block temp = first; temp != null; temp = temp.next) { - boolean found = false; - for (DataListIterator iterator : iterators.values()) { - if (iterator.da == temp) { - found = true; - break; - } - } + public boolean isMemoryBlockAvailable() + { + return numberOfInMemBlockPermits.get() > 0; + } - if (!found && temp.data != null) { - temp.release(true); - if (--inmemBlockCount < MAX_COUNT_OF_INMEM_BLOCKS) { - break; - } - } - } - //logger.debug("InmemBlockCount after release {}", inmemBlockCount); + public byte[] newBuffer() + { + return new byte[blockSize]; + } + + public void addBuffer(byte[] array) + { + final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet(); + if (numberOfInMemBlockPermits < 0) { + logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); } + last.next = new Block(identifier, array, last.ending_window, last.ending_window); + last.release(false); + last = last.next; } public byte[] getBuffer(long windowId) @@ -461,6 +488,12 @@ public class DataList return status; } + @Override + public String toString() + { + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}'; + } + /** * <p>Block class.</p> * @@ -484,7 +517,7 @@ public class DataList /** * The starting window which is available in this data array. */ - long starting_window = -1; + long starting_window; /** * the ending window which is available in this data array */ @@ -500,7 +533,8 @@ public class DataList /** * how count of references to this block. */ - int refCount; + AtomicInteger refCount; + Future future; public Block(String id, int size) { @@ -509,9 +543,17 @@ public class DataList public Block(String id, byte[] array) { + this(id, array, -1, 0); + } + + public Block(final String id, final byte[] array, final long starting_window, final long ending_window) + { identifier = id; data = array; - refCount = 1; + refCount = new AtomicInteger(1); + this.starting_window = starting_window; + this.ending_window = ending_window; + //logger.debug("Allocated new {}", this); } void getNextData(SerializedData current) @@ -530,27 +572,28 @@ public class DataList public long rewind(long windowId) { long bs = starting_window & 0x7fffffff00000000L; - DataListIterator dli = getIterator(this); - done: - while (dli.hasNext()) { - SerializedData sd = dli.next(); - switch (sd.buffer[sd.dataOffset]) { - case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - bs = (long)rwt.getBaseSeconds() << 32; - if (bs > windowId) { - writingOffset = sd.offset; - break done; - } - break; + try (DataListIterator dli = getIterator(this)) { + done: + while (dli.hasNext()) { + SerializedData sd = dli.next(); + switch (sd.buffer[sd.dataOffset]) { + case MessageType.RESET_WINDOW_VALUE: + ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + bs = (long)rwt.getBaseSeconds() << 32; + if (bs > windowId) { + writingOffset = sd.offset; + break done; + } + break; - case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - if ((bs | bwt.getWindowId()) >= windowId) { - writingOffset = sd.offset; - break done; - } - break; + case MessageType.BEGIN_WINDOW_VALUE: + BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + if ((bs | bwt.getWindowId()) >= windowId) { + writingOffset = sd.offset; + break done; + } + break; + } } } @@ -558,16 +601,12 @@ public class DataList starting_window = windowId; ending_window = windowId; //logger.debug("assigned both window id {}", this); - } - else if (windowId < ending_window) { + } else if (windowId < ending_window) { ending_window = windowId; //logger.debug("assigned end window id {}", this); } - if (uniqueIdentifier != 0) { - storage.discard(identifier, uniqueIdentifier); - uniqueIdentifier = 0; - } + discard(false); return bs; } @@ -580,39 +619,40 @@ public class DataList long bs = starting_window & 0xffffffff00000000L; SerializedData lastReset = null; - DataListIterator dli = getIterator(this); - done: - while (dli.hasNext()) { - SerializedData sd = dli.next(); - switch (sd.buffer[sd.dataOffset]) { - case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - bs = (long)rwt.getBaseSeconds() << 32; - lastReset = sd; - break; - - case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - if ((bs | bwt.getWindowId()) > longWindowId) { - found = true; - if (lastReset != null) { + try (DataListIterator dli = getIterator(this)) { + done: + while (dli.hasNext()) { + SerializedData sd = dli.next(); + switch (sd.buffer[sd.dataOffset]) { + case MessageType.RESET_WINDOW_VALUE: + ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + bs = (long)rwt.getBaseSeconds() << 32; + lastReset = sd; + break; + + case MessageType.BEGIN_WINDOW_VALUE: + BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + if ((bs | bwt.getWindowId()) > longWindowId) { + found = true; + if (lastReset != null) { /* * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. */ - if (sd.offset >= lastReset.length) { - sd.offset -= lastReset.length; - if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) { - System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length); + if (sd.offset >= lastReset.length) { + sd.offset -= lastReset.length; + if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) { + System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length); + } } + + this.starting_window = bs | bwt.getWindowId(); + this.readingOffset = sd.offset; + //logger.debug("assigned starting window id {}", this); } - this.starting_window = bs | bwt.getWindowId(); - this.readingOffset = sd.offset; - //logger.debug("assigned starting window id {}", this); + break done; } - - break done; - } + } } } @@ -654,46 +694,44 @@ public class DataList logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset); } - if (uniqueIdentifier != 0) { - storage.discard(identifier, uniqueIdentifier); - uniqueIdentifier = 0; - } + discard(false); } } - private Runnable getRetriever(final int uniqueIdentifier, final Storage storage) + private Runnable getRetriever() { return new Runnable() { @Override public void run() { - byte[] lData = storage.retrieve(identifier, uniqueIdentifier); + byte[] data = storage.retrieve(identifier, uniqueIdentifier); synchronized (Block.this) { - data = lData; + Block.this.data = data; readingOffset = 0; writingOffset = data.length; - if (refCount > 1) { + if (refCount.get() > 1) { Block.this.notifyAll(); } + int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet(); + if (numberOfInMemBlockPermits < 0) { + logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); + } } } - }; } - synchronized void acquire(boolean wait) + protected void acquire(boolean wait) { - if (refCount++ == 0 && uniqueIdentifier > 0 && storage != null) { - assert (data == null); + if (refCount.getAndIncrement() == 0 && storage != null && data == null) { + final Runnable retriever = getRetriever(); if (wait) { - getRetriever(uniqueIdentifier, storage).run(); + retriever.run(); + } else { + future = storageExecutor.submit(retriever); } - else { - storageExecutor.submit(getRetriever(uniqueIdentifier, storage)); - } - } - else if (wait && data == null) { + } else if (wait && data == null) { try { wait(); } @@ -710,35 +748,70 @@ public class DataList @Override public void run() { - int i = storage.store(identifier, data, readingOffset, writingOffset); - if (i == 0) { + if (uniqueIdentifier == 0) { + uniqueIdentifier = storage.store(identifier, data, readingOffset, writingOffset); + } + if (uniqueIdentifier == 0) { logger.warn("Storage returned unexpectedly, please check the status of the spool directory!"); } else { + //logger.debug("Spooled {} to disk", Block.this); synchronized (Block.this) { - Block.this.uniqueIdentifier = i; - if (refCount == 0) { + if (refCount.get() == 0) { Block.this.data = null; } } + int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet(); + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + resumeSuspendedClients(numberOfInMemBlockPermits); } } + }; + } + + protected void release(boolean wait) + { + final int refCount = this.refCount.decrementAndGet(); + if (refCount == 0 && storage != null) { + assert (next != null); + final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); + if (wait && numberOfInMemBlockPermits.get() == 0) { + storer.run(); + } else if (numberOfInMemBlockPermits.get() < MAX_COUNT_OF_INMEM_BLOCKS/2) { + future = storageExecutor.submit(storer); + } + } else { + logger.debug("Holding {} in memory due to {} references.", this, refCount); + } + } + private Runnable getDiscarder() + { + return new Runnable() + { + @Override + public void run() + { + if (uniqueIdentifier > 0) { + logger.debug("Discarding {}", Block.this); + storage.discard(identifier, uniqueIdentifier); + uniqueIdentifier = 0; + } + } }; } - synchronized void release(boolean wait) + protected void discard(final boolean wait) { - if (--refCount == 0 && storage != null) { - if (uniqueIdentifier != 0) { - data = null; - return; + if (storage != null) { + if (future != null) { + future.cancel(false); } + final Runnable discarder = getDiscarder(); if (wait) { - getStorer(data, readingOffset, writingOffset, storage).run(); - } - else { - storageExecutor.submit(getStorer(data, readingOffset, writingOffset, storage)); + discarder.run(); + } else { + future = storageExecutor.submit(discarder); } } } @@ -746,10 +819,10 @@ public class DataList @Override public String toString() { - return "Block{" + "identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) - + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + '}'; } @@ -760,7 +833,7 @@ public class DataList * * @since 0.3.2 */ - public class DataListIterator implements Iterator<SerializedData> + public class DataListIterator implements Iterator<SerializedData>, AutoCloseable { Block da; SerializedData current; @@ -792,12 +865,28 @@ public class DataList return readOffset; } + protected boolean switchToNextBlock() + { + Block next = getNextBlock(da); + if (next == null) { + return false; + } + //logger.debug("{}: switching to the next block {}->{}", this, da, da.next); + next.acquire(true); + da.release(false); + da = next; + size = 0; + buffer = da.data; + readOffset = da.readingOffset; + return true; + } + /** * * @return boolean */ @Override - public synchronized boolean hasNext() + public boolean hasNext() { while (size == 0) { size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset); @@ -810,53 +899,26 @@ public class DataList case -2: case -1: case 0: - if (da.writingOffset == buffer.length) { - if (da.next == null) { - return false; - } - - da.release(false); - da.next.acquire(true); - da = da.next; - size = 0; - buffer = da.data; - readOffset = da.readingOffset; - } - else { - return false; + if (da.writingOffset == buffer.length && switchToNextBlock()) { + continue; } + return false; } } - while (true) { - if (nextOffset.integer + size <= da.writingOffset) { - current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); - current.dataOffset = nextOffset.integer; - //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { - // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); - // logger.debug("next t = {}", t); - //} - return true; - } - else { - if (da.writingOffset == buffer.length) { - if (da.next == null) { - return false; - } - else { - da.release(false); - da.next.acquire(true); - da = da.next; - size = 0; - readOffset = nextOffset.integer = da.readingOffset; - buffer = da.data; - } - } - else { - return false; - } - } + if (nextOffset.integer + size <= da.writingOffset) { + current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); + current.dataOffset = nextOffset.integer; + //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { + // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); + // logger.debug("next t = {}", t); + //} + return true; + } else if (da.writingOffset == buffer.length && switchToNextBlock()) { + nextOffset.integer = da.readingOffset; + return hasNext(); } + return false; } /** @@ -882,6 +944,16 @@ public class DataList current.buffer[current.dataOffset] = MessageType.NO_MESSAGE_VALUE; } + @Override + public void close() + { + if (da != null) { + da.release(false); + da = null; + buffer = null; + } + } + void rewind(int processingOffset) { readOffset = processingOffset; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index d260b37..fe0d9f4 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -39,12 +39,6 @@ public class FastDataList extends DataList super(identifier, blocksize, numberOfCacheBlocks); } - public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount) - { - super(identifier, blocksize, numberOfCacheBlocks, refCount); - } - - long item; @Override @@ -102,7 +96,7 @@ public class FastDataList extends DataList last.writingOffset = writeOffset; - autoflushExecutor.submit(new Runnable() + autoFlushExecutor.submit(new Runnable() { @Override public void run() @@ -116,7 +110,7 @@ public class FastDataList extends DataList } @Override - public FastDataListIterator getIterator(Block block) + protected FastDataListIterator getIterator(Block block) { return new FastDataListIterator(block); } @@ -188,59 +182,37 @@ public class FastDataList extends DataList } @Override - public synchronized boolean hasNext() + public boolean hasNext() { while (size == 0) { if (da.writingOffset - readOffset >= 2) { size = buffer[readOffset]; size |= (buffer[readOffset + 1] << 8); - } - else { - if (da.writingOffset == buffer.length) { - if (da.next == null) { - return false; - } - - da.release(false); - da.next.acquire(true); - da = da.next; - size = 0; - buffer = da.data; - readOffset = da.readingOffset; - } - else { + } else { + if (da.writingOffset == buffer.length && switchToNextBlock()) { + continue; + } else { return false; } } } - while (true) { - if (readOffset + size + 2 <= da.writingOffset) { - current = new SerializedData(buffer, readOffset, size + 2); - current.dataOffset = readOffset + 2; - return true; - } - else { - if (da.writingOffset == buffer.length) { - if (da.next == null) { - return false; - } - else { - da.release(false); - da.next.acquire(true); - da = da.next; - size = 0; - readOffset = nextOffset.integer = da.readingOffset; - buffer = da.data; - } - } - else { + if (readOffset + size + 2 <= da.writingOffset) { + current = new SerializedData(buffer, readOffset, size + 2); + current.dataOffset = readOffset + 2; + return true; + } else { + if (da.writingOffset == buffer.length) { + if (!switchToNextBlock()) { return false; } + nextOffset.integer = da.readingOffset; + return hasNext(); + } else { + return false; } } } - } private static final Logger logger = LoggerFactory.getLogger(FastDataList.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 7fb4823..33a2442 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -97,16 +97,16 @@ public class Server implements ServerListener @Override public void unregistered(SelectionKey key) { - serverHelperExecutor.shutdown(); - storageHelperExecutor.shutdown(); - try { - serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) { - logger.debug("Executor Termination", ex); - } - logger.info("Server stopped listening at {}", address); - } + serverHelperExecutor.shutdown(); + storageHelperExecutor.shutdown(); + try { + serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) { + logger.debug("Executor Termination", ex); + } + logger.info("Server stopped listening at {}", address); + } public synchronized InetSocketAddress run(EventLoop eventloop) { @@ -262,7 +262,7 @@ public class Server implements ServerListener //logger.debug("old list = {}", dl); } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(upstream_identifier, dl); //logger.debug("new list = {}", dl); } @@ -401,7 +401,7 @@ public class Server implements ServerListener PublishRequestTuple publisherRequest = (PublishRequestTuple)request; DataList dl = handlePublisherRequest(publisherRequest, this); - dl.setAutoflushExecutor(serverHelperExecutor); + dl.setAutoFlushExecutor(serverHelperExecutor); Publisher publisher; if (publisherRequest.getVersion().equals(Tuple.FAST_VERSION)) { @@ -616,6 +616,32 @@ public class Server implements ServerListener dirty = true; } + /** + * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} + * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)} + * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key + * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. + * @return true + */ + @Override + public boolean resumeReadIfSuspended() + { + eventloop.submit(new Runnable() + { + @Override + public void run() + { + final int interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_READ) == 0) { + logger.debug("Resuming read on key {} with attachment {}", key, key.attachment()); + read(0); + key.interestOps(interestOps | SelectionKey.OP_READ); + } + } + }); + return true; + } + @Override public void read(int len) { @@ -634,7 +660,9 @@ public class Server implements ServerListener * so we allocate a new byteBuffer and copy over the partially written data to the * new byteBuffer and start as if we always had full room but not enough data. */ - switchToNewBuffer(buffer, readOffset); + if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) { + return; + } } } else if (dirty) { @@ -660,10 +688,13 @@ public class Server implements ServerListener /* * hit wall while writing serialized data, so have to allocate a new byteBuffer. */ - switchToNewBuffer(buffer, readOffset - VarInt.getSize(size)); + if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) { + readOffset -= VarInt.getSize(size); + size = 0; + return; + } size = 0; - } - else if (dirty) { + } else if (dirty) { dirty = false; datalist.flush(writeOffset); } @@ -673,21 +704,33 @@ public class Server implements ServerListener while (true); } - public void switchToNewBuffer(byte[] array, int offset) + private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset) { - byte[] newBuffer = new byte[datalist.getBlockSize()]; - byteBuffer = ByteBuffer.wrap(newBuffer); - if (array == null || array.length - offset == 0) { - writeOffset = 0; + if (switchToNewBuffer(array, offset)) { + return true; } - else { - writeOffset = array.length - offset; - System.arraycopy(buffer, offset, newBuffer, 0, writeOffset); - byteBuffer.position(writeOffset); + datalist.suspendRead(this); + return false; + } + + private boolean switchToNewBuffer(final byte[] array, final int offset) + { + if (datalist.isMemoryBlockAvailable()) { + final byte[] newBuffer = datalist.newBuffer(); + byteBuffer = ByteBuffer.wrap(newBuffer); + if (array == null || array.length - offset == 0) { + writeOffset = 0; + } else { + writeOffset = array.length - offset; + System.arraycopy(buffer, offset, newBuffer, 0, writeOffset); + byteBuffer.position(writeOffset); + } + buffer = newBuffer; + readOffset = 0; + datalist.addBuffer(buffer); + return true; } - buffer = newBuffer; - readOffset = 0; - datalist.addBuffer(buffer); + return false; } @Override @@ -714,7 +757,7 @@ public class Server implements ServerListener @Override public String toString() { - return "Server.Publisher{" + "datalist=" + datalist + '}'; + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + datalist + '}'; } private volatile boolean torndown;
