Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 5b66f7a0a -> be34b5e86


APEXCORE-353 Allow autoFlushExecutor to process jobs from other datalists when 
there is more data to send and physical nodes for the current datalist are not 
ready to get more data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a9b4d3e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a9b4d3e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a9b4d3e6

Branch: refs/heads/release-3.2
Commit: a9b4d3e618c0bcd5090e4d36222983b63ceab9ab
Parents: fd2a7e8
Author: Vlad Rozov <[email protected]>
Authored: Tue Feb 23 21:46:16 2016 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Tue Feb 23 21:46:16 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/bufferserver/internal/DataList.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a9b4d3e6/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 1f6c273..f39eca1 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -287,13 +287,13 @@ public class DataList
         @Override
         public void run()
         {
-          boolean atLeastOneListenerHasDataToSend;
-          do {
-            atLeastOneListenerHasDataToSend = false;
-            for (DataListener dl : all_listeners) {
-              atLeastOneListenerHasDataToSend |= dl.addedData();
-            }
-          } while (atLeastOneListenerHasDataToSend);
+          boolean atLeastOneListenerHasDataToSend = false;
+          for (DataListener dl : all_listeners) {
+            atLeastOneListenerHasDataToSend |= dl.addedData();
+          }
+          if (atLeastOneListenerHasDataToSend) {
+            future = autoFlushExecutor.submit(this);
+          }
         }
       });
     }

Reply via email to