Repository: apex-core Updated Branches: refs/heads/master 84e6663a5 -> 576047e41
APEXCORE-641 Subscribers/DataListeners may not be scheduled to execute even when they have data to process Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/576047e4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/576047e4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/576047e4 Branch: refs/heads/master Commit: 576047e413d01c4997509b1dfbdb1176fe89db17 Parents: 84e6663 Author: Vlad Rozov <[email protected]> Authored: Wed Feb 8 10:15:20 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sat Mar 18 09:29:39 2017 -0700 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 95 ++++++++++++++++---- .../bufferserver/internal/DataListener.java | 2 +- .../bufferserver/internal/LogicalNode.java | 8 +- 3 files changed, 83 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 3a446b6..84999fa 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -71,7 +71,7 @@ public class DataList private final Set<AbstractClient> suspendedClients = newHashSet(); private final AtomicInteger numberOfInMemBlockPermits; private MutableInt nextOffset = new MutableInt(); - private Future<?> future; + private final ListenersNotifier listenersNotifier = new ListenersNotifier(); private final boolean backPressureEnabled; public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled) @@ -291,22 +291,7 @@ public class DataList public void notifyListeners() { - if (future == null || future.isDone() || future.isCancelled()) { - future = autoFlushExecutor.submit(new Runnable() - { - @Override - public void run() - { - boolean atLeastOneListenerHasDataToSend = false; - for (DataListener dl : all_listeners) { - atLeastOneListenerHasDataToSend |= dl.addedData(); - } - if (atLeastOneListenerHasDataToSend) { - future = autoFlushExecutor.submit(this); - } - } - }); - } + listenersNotifier.moreDataAvailable(); } public void setAutoFlushExecutor(final ExecutorService es) @@ -1066,5 +1051,81 @@ public class DataList } + private class ListenersNotifier implements Runnable + { + private volatile Future<?> future; + private boolean isMoreDataAvailable = false; + + private void moreDataAvailable() + { + final Future<?> future = this.future; + if (future == null || future.isDone() || future.isCancelled()) { + // Do not schedule a new task if there is an existing one that is still running or is waiting in the queue + this.future = autoFlushExecutor.submit(listenersNotifier); + } else { + synchronized (this) { + if (this.future == null) { + // future is set to null before run() exists, no need to check whether future isDone() or isCancelled() + this.future = autoFlushExecutor.submit(this); + } else { + isMoreDataAvailable = true; + } + } + } + } + + private boolean addedData() + { + boolean doesAtLeastOneListenerHaveDataToSend = false; + for (DataListener dl : all_listeners) { + try { + doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false); + } catch (RuntimeException e) { + logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + removeDataListener(dl); + break; + } + } + return doesAtLeastOneListenerHaveDataToSend; + } + + private boolean checkIfListenersHaveDataToSendOnly() + { + for (DataListener dl : all_listeners) { + try { + if (dl.addedData(true)) { + return true; + } + } catch (RuntimeException e) { + logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + removeDataListener(dl); + return checkIfListenersHaveDataToSendOnly(); + } + } + return false; + } + + @Override + public void run() + { + try { + if (addedData() || checkIfListenersHaveDataToSendOnly()) { + future = autoFlushExecutor.submit(this); + } else { + synchronized (this) { + if (isMoreDataAvailable) { + isMoreDataAvailable = false; + future = autoFlushExecutor.submit(this); + } else { + future = null; + } + } + } + } catch (Exception e) { + logger.error("{}", DataList.this, e); + } + } + } + private static final Logger logger = LoggerFactory.getLogger(DataList.class); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java index a6a1fab..e85b662 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java @@ -36,7 +36,7 @@ public interface DataListener /** */ - boolean addedData(); + boolean addedData(boolean checkIfListenerHaveDataToSendOnly); /** * http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 2921128..08a483a 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -221,7 +221,7 @@ public class LogicalNode implements DataListener } if (iterator.hasNext()) { - addedData(); + addedData(false); } } @@ -229,9 +229,9 @@ public class LogicalNode implements DataListener } @Override - public boolean addedData() + public boolean addedData(boolean checkIfListenerHaveDataToSendOnly) { - if (isReady()) { + if (!checkIfListenerHaveDataToSendOnly && isReady()) { if (caughtup) { try { /* @@ -302,7 +302,7 @@ public class LogicalNode implements DataListener catchUp(); } } - return !ready; + return iterator.hasNext(); } /**
