This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 442459e9d36abde61ce27d91bd81dae979d2648b
Author: Jeongyoon Eo <[email protected]>
AuthorDate: Sun Mar 4 21:23:23 2018 +0900

    fix OutputWriter to remove from queue
---
 .../edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java     | 5 ++++-
 .../edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java   | 2 +-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 245ab96..f507985 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -80,7 +80,10 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
   public void write() {
     // Aggregate element to form the inter-Stage data.
     List<Object> dataToWrite = new ArrayList<>();
-    outputQueue.iterator().forEachRemaining(dataToWrite::add);
+    while(outputQueue.size() > 0) {
+      Object output = outputQueue.remove();
+      dataToWrite.add(output);
+    }
 
     final Boolean isDataSizeMetricCollectionEdge = 
MetricCollectionProperty.Value.DataSkewRuntimePass
         
.equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
index 3f72a1d..ec60605 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
@@ -241,7 +241,7 @@ public final class TaskGroupExecutorTest {
         @Override
         public Object answer(final InvocationOnMock invocationOnMock) throws 
Throwable {
           final Object[] args = invocationOnMock.getArguments();
-          final Iterable dataToWrite = (Iterable) args[0];
+          final Object dataToWrite = args[0];
           taskIdToOutputData.computeIfAbsent(dstTask.getId(), emptyTaskId -> 
new ArrayList<>());
           taskIdToOutputData.get(dstTask.getId()).add(dataToWrite);
           return null;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to