Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 5b66f7a0a -> be34b5e86
APEXCORE-353 Allow autoFlushExecutor to process jobs from other datalists when there is more data to send and physical nodes for the current datalist are not ready to get more data. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a9b4d3e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a9b4d3e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a9b4d3e6 Branch: refs/heads/release-3.2 Commit: a9b4d3e618c0bcd5090e4d36222983b63ceab9ab Parents: fd2a7e8 Author: Vlad Rozov <[email protected]> Authored: Tue Feb 23 21:46:16 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Tue Feb 23 21:46:16 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/bufferserver/internal/DataList.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a9b4d3e6/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 1f6c273..f39eca1 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -287,13 +287,13 @@ public class DataList @Override public void run() { - boolean atLeastOneListenerHasDataToSend; - do { - atLeastOneListenerHasDataToSend = false; - for (DataListener dl : all_listeners) { - atLeastOneListenerHasDataToSend |= dl.addedData(); - } - } while (atLeastOneListenerHasDataToSend); + boolean atLeastOneListenerHasDataToSend = false; + for (DataListener dl : all_listeners) { + atLeastOneListenerHasDataToSend |= dl.addedData(); + } + if (atLeastOneListenerHasDataToSend) { + future = autoFlushExecutor.submit(this); + } } }); }
