Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 707bfbcf9 -> da2f0edb2
APEXCORE-353 Allow autoFlushExecutor to process jobs from other datalists when there is more data to send and physical nodes for the current datalist are not ready to get more data. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/da2f0edb Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/da2f0edb Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/da2f0edb Branch: refs/heads/release-3.3 Commit: da2f0edb2c4e04439c37741ba036fa886d5c7ee5 Parents: 707bfbc Author: Vlad Rozov <[email protected]> Authored: Tue Feb 23 21:46:16 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Thu Feb 25 09:26:45 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/bufferserver/internal/DataList.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/da2f0edb/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index fa20aa2..0de7261 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -296,13 +296,13 @@ public class DataList @Override public void run() { - boolean atLeastOneListenerHasDataToSend; - do { - atLeastOneListenerHasDataToSend = false; - for (DataListener dl : all_listeners) { - atLeastOneListenerHasDataToSend |= dl.addedData(); - } - } while (atLeastOneListenerHasDataToSend); + boolean atLeastOneListenerHasDataToSend = false; + for (DataListener dl : all_listeners) { + atLeastOneListenerHasDataToSend |= dl.addedData(); + } + if (atLeastOneListenerHasDataToSend) { + future = autoFlushExecutor.submit(this); + } } }); }
