This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 442459e9d36abde61ce27d91bd81dae979d2648b Author: Jeongyoon Eo <[email protected]> AuthorDate: Sun Mar 4 21:23:23 2018 +0900 fix OutputWriter to remove from queue --- .../edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java | 5 ++++- .../edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java index 245ab96..f507985 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java @@ -80,7 +80,10 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { public void write() { // Aggregate element to form the inter-Stage data. List<Object> dataToWrite = new ArrayList<>(); - outputQueue.iterator().forEachRemaining(dataToWrite::add); + while(outputQueue.size() > 0) { + Object output = outputQueue.remove(); + dataToWrite.add(output); + } final Boolean isDataSizeMetricCollectionEdge = MetricCollectionProperty.Value.DataSkewRuntimePass .equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection)); diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java index 3f72a1d..ec60605 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java @@ -241,7 +241,7 @@ public final class TaskGroupExecutorTest { @Override public Object answer(final InvocationOnMock invocationOnMock) throws Throwable { final Object[] args = invocationOnMock.getArguments(); - final Iterable dataToWrite = (Iterable) args[0]; + final Object dataToWrite = args[0]; taskIdToOutputData.computeIfAbsent(dstTask.getId(), emptyTaskId -> new ArrayList<>()); taskIdToOutputData.get(dstTask.getId()).add(dataToWrite); return null; -- To stop receiving notification emails like this one, please contact [email protected].
