Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 184ea5e2c -> 5e0d4b758


APEXCORE-353 Allow autoFlushExecutor to process jobs from other datalists when 
there is more data to send and physical nodes for the current datalist are not 
ready to get more data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5e0d4b75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5e0d4b75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5e0d4b75

Branch: refs/heads/master
Commit: 5e0d4b7588b5cba7273ff5d37705d18db4f6f139
Parents: 184ea5e
Author: Vlad Rozov <[email protected]>
Authored: Tue Feb 23 21:46:16 2016 -0800
Committer: Pramod Immaneni <[email protected]>
Committed: Thu Feb 25 09:28:26 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/bufferserver/internal/DataList.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5e0d4b75/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index fa20aa2..0de7261 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -296,13 +296,13 @@ public class DataList
         @Override
         public void run()
         {
-          boolean atLeastOneListenerHasDataToSend;
-          do {
-            atLeastOneListenerHasDataToSend = false;
-            for (DataListener dl : all_listeners) {
-              atLeastOneListenerHasDataToSend |= dl.addedData();
-            }
-          } while (atLeastOneListenerHasDataToSend);
+          boolean atLeastOneListenerHasDataToSend = false;
+          for (DataListener dl : all_listeners) {
+            atLeastOneListenerHasDataToSend |= dl.addedData();
+          }
+          if (atLeastOneListenerHasDataToSend) {
+            future = autoFlushExecutor.submit(this);
+          }
         }
       });
     }

Reply via email to