This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 2e97984c62fc519466b40d64fa82929c51d444a7
Merge: 13080f3 ee830d5
Author: Jeongyoon Eo <[email protected]>
AuthorDate: Sun Mar 4 14:06:42 2018 +0900

    merge master

 bin/generate_javadocs.sh                           |   2 +-
 .../DuplicateEdgeGroupProperty.java                |  41 ++
 .../DuplicateEdgeGroupPropertyValue.java           |  99 ++++
 .../ir/executionproperty/ExecutionProperty.java    |   2 +-
 .../edu/snu/nemo/common/ir/vertex/LoopVertex.java  |  14 +
 .../edu/snu/nemo/common/test/ExampleTestUtil.java  |  50 ++
 .../beam/transform/GroupByKeyTransform.java        |   1 -
 .../nemo/compiler/frontend/spark/sql/Dataset.java  |  12 +-
 .../compiler/frontend/spark/sql/SparkSession.java  |  16 +-
 .../frontend/spark/transform/CollectTransform.java |   1 -
 .../spark/transform/ReduceByKeyTransform.java      |  18 +-
 .../annotating/DuplicateEdgeGroupSizePass.java     |  66 +++
 .../composite/LoopOptimizationCompositePass.java   |   4 +-
 .../compiletime/reshaping/LoopUnrollingPass.java   |   1 +
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |   2 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |  27 +-
 .../beam/AlternatingLeastSquareITCase.java         |  38 +-
 .../snu/nemo/examples/beam/BroadcastITCase.java    |   5 +-
 .../snu/nemo/examples/beam/MapReduceITCase.java    |   2 +-
 .../beam/MultinomialLogisticRegressionITCase.java  |   2 +-
 examples/resources/sample_input_mr                 |   2 +-
 examples/resources/test_output_als                 | 627 +++++++++++++++++++++
 .../edu/snu/nemo/examples/spark/SparkITCase.java   |  15 +-
 .../pass/runtime/DataSkewRuntimePass.java          |   2 +-
 .../plan/physical/PhysicalPlanGenerator.java       |  35 ++
 .../nemo/runtime/executor/TaskGroupExecutor.java   |   2 +-
 .../runtime/executor/TaskGroupStateManager.java    |  68 +--
 .../runtime/executor/data/BlockManagerWorker.java  |  50 +-
 .../snu/nemo/runtime/executor/data/DataUtil.java   |  28 +-
 .../runtime/executor/datatransfer/InputReader.java |  24 +-
 .../executor/datatransfer/OutputWriter.java        |  21 +-
 .../nemo/runtime/master/BlockManagerMaster.java    | 144 +++--
 .../edu/snu/nemo/runtime/master/BlockMetadata.java |  29 +-
 .../optimizer/policy/PolicyBuilderTest.java        |   6 +-
 .../runtime/executor/TaskGroupExecutorTest.java    |  15 +-
 .../executor/datatransfer/DataTransferTest.java    | 183 ++++++
 .../runtime/master/BlockManagerMasterTest.java     |  32 +-
 37 files changed, 1467 insertions(+), 219 deletions(-)

diff --cc 
common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
index 075ef66,eeadd66..ef55daa
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
@@@ -79,6 -80,6 +80,5 @@@ public abstract class ExecutionProperty
      Parallelism,
      ScheduleGroupIndex,
      StageId,
--
    }
  }
diff --cc common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
index 1b4b8b8,eeec1a9..61b42ca
--- a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
@@@ -19,8 -19,11 +19,11 @@@ import java.io.IOException
  import java.nio.file.Files;
  import java.nio.file.Path;
  import java.nio.file.Paths;
 -import java.util.Arrays;
 -import java.util.List;
  import java.util.Set;
  import java.util.stream.Collectors;
++import java.util.Arrays;
++import java.util.List;
+ import java.util.stream.IntStream;
  
  /**
   * Test Utils for Examples.
diff --cc 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 7c6d00f,25e3fb9..0ea183d
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@@ -28,10 -26,8 +28,9 @@@ import java.util.*
   * @param <I> input type.
   */
  public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, 
List>> {
 +  private static final Logger LOG = 
LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
- 
    private final Map<Object, List> keyToValues;
 -  private OutputCollector<KV<Object, List>> outputCollector;
 +  private Pipe<KV<Object, List>> pipe;
  
    /**
     * GroupByKey constructor.
diff --cc 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index bc6c486,72d2f49..ef25601
--- 
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@@ -15,9 -15,9 +15,11 @@@
   */
  package edu.snu.nemo.compiler.frontend.spark.transform;
  
 -import edu.snu.nemo.common.ir.OutputCollector;
 +import edu.snu.nemo.common.ir.Pipe;
  import edu.snu.nemo.common.ir.vertex.transform.Transform;
  import org.apache.spark.api.java.function.Function2;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
  import scala.Tuple2;
  
  import java.util.*;
@@@ -28,9 -28,9 +30,11 @@@
   * @param <V> value type.
   */
  public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K, 
V>, Tuple2<K, V>> {
++  private static final Logger LOG = 
LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
++
    private final Map<K, List<V>> keyToValues;
    private final Function2<V, V, V> func;
 -  private OutputCollector<Tuple2<K, V>> oc;
 +  private Pipe<Tuple2<K, V>> pipe;
  
    /**
     * Constructor.
@@@ -57,10 -56,10 +61,14 @@@
  
    @Override
    public void close() {
--    keyToValues.entrySet().stream().map(entry -> {
--      final V value = 
ReduceTransform.reduceIterator(entry.getValue().iterator(), func);
--      return new Tuple2<>(entry.getKey(), value);
-     }).forEach(pipe::emit);
 -    }).forEach(oc::emit);
--    keyToValues.clear();
++    if (keyToValues.isEmpty()) {
++      LOG.warn("Spark ReduceByKeyTransform received no data!");
++    } else {
++      keyToValues.entrySet().stream().map(entry -> {
++        final V value = 
ReduceTransform.reduceIterator(entry.getValue().iterator(), func);
++        return new Tuple2<>(entry.getKey(), value);
++      }).forEach(pipe::emit);
++      keyToValues.clear();
++    }
    }
  }
diff --cc 
examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index e341604,5585cd7..2d96b6e
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@@ -15,10 -15,12 +15,12 @@@
   */
  package edu.snu.nemo.examples.beam;
  
- import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
  import edu.snu.nemo.client.JobLauncher;
  import edu.snu.nemo.common.test.ArgBuilder;
- 
+ import edu.snu.nemo.common.test.ExampleTestUtil;
+ import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
 -import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelsimFive;
++import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+ import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;
  import org.junit.runner.RunWith;
diff --cc 
examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
index 2770bf2,5dc7a11..b918960
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
@@@ -20,7 -19,7 +19,7 @@@ import edu.snu.nemo.client.JobLauncher
  import edu.snu.nemo.common.test.ArgBuilder;
  import edu.snu.nemo.common.test.ExampleTestUtil;
  import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
- 
 -import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelsimFive;
++import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;
diff --cc 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 85d58ee,89284e2..cbc12e9
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@@ -775,4 -516,4 +775,4 @@@ public final class TaskGroupExecutor 
        }
      }
    }
--}
++}
diff --cc 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java
index 707dca0,3a1fe7f..e35eadc
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java
@@@ -18,8 -18,9 +18,7 @@@ package edu.snu.nemo.runtime.executor
  import edu.snu.nemo.common.dag.DAG;
  import edu.snu.nemo.common.exception.UnknownExecutionStateException;
  import edu.snu.nemo.common.exception.UnknownFailureCauseException;
 -import edu.snu.nemo.common.StateMachine;
  import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
--import edu.snu.nemo.runtime.common.comm.ControlMessage;
  import edu.snu.nemo.runtime.common.message.MessageEnvironment;
  import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
  import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@@ -45,7 -47,20 +44,6 @@@ public final class TaskGroupStateManage
    private final int attemptIdx;
    private final String executorId;
    private final MetricCollector metricCollector;
 -
 -  /**
 -   * Used to track all task states of this task group, by keeping a map of 
logical task ids to their states.
 -   */
 -  private final Map<String, TaskState> logicalIdToTaskStates;
 -
 -  /**
 -   * Used to track task group completion status.
 -   * All task ids are added to the set when the this task group begins 
executing.
 -   * Each task id is removed upon completion,
 -   * therefore indicating the task group's completion when this set becomes 
empty.
 -   */
 -  private Set<String> currentTaskGroupTaskIds;
--
    private final PersistentConnectionToMasterMap 
persistentConnectionToMasterMap;
  
    public TaskGroupStateManager(final ScheduledTaskGroup scheduledTaskGroup,
diff --cc 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 7898cf3,61dfeba..d80a8c9
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@@ -46,8 -45,6 +47,7 @@@ import org.slf4j.LoggerFactory
   * TODO #492: Modularize the data communication pattern.
   */
  public final class InputReader extends DataTransfer {
 +  private static final Logger LOG = 
LoggerFactory.getLogger(InputReader.class.getName());
- 
    private final int dstTaskIndex;
    private final BlockManagerWorker blockManagerWorker;
  
diff --cc 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index ef5c976,f28d893..31ed98f
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@@ -368,4 -357,82 +357,83 @@@ public final class BlockManagerMaster 
        }
      }
    }
+ 
+   /**
+    * The handler of block location requests.
+    */
+   @VisibleForTesting
+   public static final class BlockLocationRequestHandler {
+     private final String blockId;
+     private final CompletableFuture<String> locationFuture;
+ 
+     /**
+      * Constructor.
+      *
+      * @param blockId the ID of the block.
+      */
+     BlockLocationRequestHandler(final String blockId) {
+       this.blockId = blockId;
+       this.locationFuture = new CompletableFuture<>();
+     }
+ 
+     /**
+      * Completes the block location future.
+      * If there is any pending request, replies with the completed location.
+      *
+      * @param location the location of the block.
+      */
+     void complete(final String location) {
+       locationFuture.complete(location);
+     }
+ 
+     /**
+      * Completes the block location future with failure.
+      * If there is any pending request, replies with the cause.
+      *
+      * @param throwable the cause of failure.
+      */
+     void completeExceptionally(final Throwable throwable) {
+       locationFuture.completeExceptionally(throwable);
+     }
+ 
+     /**
+      * Registers a request for the block location.
+      * If the location is already known, reply the location instantly.
+      *
+      * @param requestId      the ID of the block location request.
+      * @param messageContext the message context to reply.
+      */
+     void registerRequest(final long requestId,
+                          final MessageContext messageContext) {
+       final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder =
+           ControlMessage.BlockLocationInfoMsg.newBuilder()
+               .setRequestId(requestId)
+               .setBlockId(blockId);
+ 
+       locationFuture.whenComplete((location, throwable) -> {
+         if (throwable == null) {
+           infoMsgBuilder.setOwnerExecutorId(location);
+         } else {
+           infoMsgBuilder.setState(
+               convertBlockState(((AbsentBlockException) 
throwable).getState()));
+         }
+         messageContext.reply(
+             ControlMessage.Message.newBuilder()
+                 .setId(RuntimeIdGenerator.generateMessageId())
+                 
.setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+                 .setType(ControlMessage.MessageType.BlockLocationInfo)
+                 .setBlockLocationInfoMsg(infoMsgBuilder.build())
+                 .build());
+       });
+     }
+ 
+     /**
+      * @return the future of the block location.
+      */
+     @VisibleForTesting
+     public Future<String> getLocationFuture() {
+       return locationFuture;
+     }
+   }
++>>>>>>> master
  }
diff --cc 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index a7b810c,aae7b40..1abb2c8
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@@ -30,11 -29,10 +29,10 @@@ import javax.annotation.concurrent.Thre
   */
  @ThreadSafe
  final class BlockMetadata {
-   // Partition level metadata.
--  private static final Logger LOG = 
LoggerFactory.getLogger(BlockManagerMaster.class.getName());
++  private static final Logger LOG = 
LoggerFactory.getLogger(BlockMetadata.class.getName());
    private final String blockId;
    private final BlockState blockState;
-   private volatile CompletableFuture<String> locationFuture; // the future of 
the location of this block.
+   private volatile BlockManagerMaster.BlockLocationRequestHandler 
locationHandler;
  
    /**
     * Constructs the metadata for a block.
diff --cc 
tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
index 38f52f0,0209670..3f72a1d
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
@@@ -27,7 -27,7 +27,6 @@@ import edu.snu.nemo.common.ir.vertex.IR
  import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
  import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
  import edu.snu.nemo.runtime.common.plan.physical.*;
--import edu.snu.nemo.runtime.common.state.TaskState;
  import edu.snu.nemo.runtime.executor.MetricMessageSender;
  import edu.snu.nemo.runtime.executor.TaskGroupExecutor;
  import edu.snu.nemo.runtime.executor.TaskGroupStateManager;
diff --cc 
tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 5ef643d,b536372..7834620
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@@ -336,7 -373,149 +373,151 @@@ public final class DataTransferTest 
      }
    }
  
+   private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
+                                              final BlockManagerWorker 
receiver,
+                                              final 
DataCommunicationPatternProperty.Value commPattern,
+                                              final DataStoreProperty.Value 
store) throws RuntimeException {
+     final int testIndex = TEST_INDEX.getAndIncrement();
+     final int testIndex2 = TEST_INDEX.getAndIncrement();
+     final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
+     final String edgeId2 = String.format(EDGE_PREFIX_TEMPLATE, testIndex2);
+     final Pair<IRVertex, IRVertex> verticesPair = setupVertices(edgeId, 
edgeId2, sender, receiver);
+     final IRVertex srcVertex = verticesPair.left();
+     final IRVertex dstVertex = verticesPair.right();
+ 
+     // Edge setup
+     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, 
CODER);
+     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
+     final ExecutionPropertyMap edgeProperties = 
dummyIREdge.getExecutionProperties();
+     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
+     
edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
+     edgeProperties.put(DuplicateEdgeGroupProperty.of(new 
DuplicateEdgeGroupPropertyValue("dummy")));
+     final DuplicateEdgeGroupPropertyValue duplicateDataProperty = 
edgeProperties.get(ExecutionProperty.Key.DuplicateEdgeGroup);
+     duplicateDataProperty.setRepresentativeEdgeId(edgeId);
+     duplicateDataProperty.setGroupSize(2);
+ 
+     edgeProperties.put(DataStoreProperty.of(store));
+     
edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+     final RuntimeEdge dummyEdge, dummyEdge2;
+ 
+     final IRVertex srcMockVertex = mock(IRVertex.class);
+     final IRVertex dstMockVertex = mock(IRVertex.class);
+     final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
+     final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
+     dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, 
dstMockVertex,
+         srcStage, dstStage, CODER, false);
+     final IRVertex dstMockVertex2 = mock(IRVertex.class);
+     final PhysicalStage dstStage2 = setupStages("dstStage-" + testIndex2);
+     dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, 
srcMockVertex, dstMockVertex2,
+         srcStage, dstStage2, CODER, false);
+     // Initialize states in Master
+     srcStage.getTaskGroupIds().forEach(srcTaskGroupId -> {
+       final String blockId = RuntimeIdGenerator.generateBlockId(
+           edgeId, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId));
+       master.initializeState(blockId, srcTaskGroupId);
+       final String blockId2 = RuntimeIdGenerator.generateBlockId(
+           edgeId2, 
RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId));
+       master.initializeState(blockId2, srcTaskGroupId);
+       master.onProducerTaskGroupScheduled(srcTaskGroupId);
+     });
+ 
+     // Write
+     final List<List> dataWrittenList = new ArrayList<>();
+     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
+       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
+       final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, 
srcTaskIndex, srcVertex.getId(), dstVertex,
+           dummyEdge, sender);
 -      writer.write(dataWritten);
++      dataWritten.iterator().forEachRemaining(writer::writeElement);
++      writer.write();
+       writer.close();
+       dataWrittenList.add(dataWritten);
+ 
+       final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, 
srcTaskIndex, srcVertex.getId(), dstVertex,
+           dummyEdge2, sender);
 -      writer2.write(dataWritten);
++      dataWritten.iterator().forEachRemaining(writer::writeElement);
++      writer2.write();
+       writer2.close();
+     });
+ 
+     // Read
+     final List<List> dataReadList = new ArrayList<>();
+     final List<List> dataReadList2 = new ArrayList<>();
+     IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
+       final InputReader reader =
+           new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+       final InputReader reader2 =
+           new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
+ 
+       if 
(DataCommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
+         assertEquals(1, reader.getSourceParallelism());
+       } else {
+         assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
+       }
+ 
+       if 
(DataCommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
+         assertEquals(1, reader2.getSourceParallelism());
+       } else {
+         assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
+       }
+ 
+       final List dataRead = new ArrayList<>();
+       try {
+         
InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add);
+       } catch (final Exception e) {
+         throw new RuntimeException(e);
+       }
+       dataReadList.add(dataRead);
+ 
+       final List dataRead2 = new ArrayList<>();
+       try {
+         
InputReader.combineFutures(reader2.read()).forEachRemaining(dataRead2::add);
+       } catch (final Exception e) {
+         throw new RuntimeException(e);
+       }
+       dataReadList2.add(dataRead2);
+     });
+ 
+     // Compare (should be the same)
+     final List flattenedWrittenData = flatten(dataWrittenList);
+     final List flattenedWrittenData2 = flatten(dataWrittenList);
+     final List flattenedReadData = flatten(dataReadList);
+     final List flattenedReadData2 = flatten(dataReadList2);
+     if (DataCommunicationPatternProperty.Value.BroadCast.equals(commPattern)) 
{
+       final List broadcastedWrittenData = new ArrayList<>();
+       final List broadcastedWrittenData2 = new ArrayList<>();
+       IntStream.range(0, PARALLELISM_TEN).forEach(i -> 
broadcastedWrittenData.addAll(flattenedWrittenData));
+       IntStream.range(0, PARALLELISM_TEN).forEach(i -> 
broadcastedWrittenData2.addAll(flattenedWrittenData2));
+       assertEquals(broadcastedWrittenData.size(), flattenedReadData.size());
+       flattenedReadData.forEach(rData -> 
assertTrue(broadcastedWrittenData.remove(rData)));
+       flattenedReadData2.forEach(rData -> 
assertTrue(broadcastedWrittenData2.remove(rData)));
+     } else {
+       assertEquals(flattenedWrittenData.size(), flattenedReadData.size());
+       flattenedReadData.forEach(rData -> 
assertTrue(flattenedWrittenData.remove(rData)));
+       flattenedReadData2.forEach(rData -> 
assertTrue(flattenedWrittenData2.remove(rData)));
+     }
+   }
+ 
+   private Pair<IRVertex, IRVertex> setupVertices(final String edgeId,
+                                                  final BlockManagerWorker 
sender,
+                                                  final BlockManagerWorker 
receiver) {
+     serializerManagers.get(sender).register(edgeId, CODER, new 
ExecutionPropertyMap(""));
+     serializerManagers.get(receiver).register(edgeId, CODER, new 
ExecutionPropertyMap(""));
+ 
+     // Src setup
+     final SourceVertex srcVertex = new 
EmptyComponents.EmptySourceVertex("Source");
+     final ExecutionPropertyMap srcVertexProperties = 
srcVertex.getExecutionProperties();
+     srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ 
+     // Dst setup
+     final SourceVertex dstVertex = new 
EmptyComponents.EmptySourceVertex("Destination");
+     final ExecutionPropertyMap dstVertexProperties = 
dstVertex.getExecutionProperties();
+     dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ 
+     return Pair.of(srcVertex, dstVertex);
+   }
+ 
    private Pair<IRVertex, IRVertex> setupVertices(final String edgeId,
+                                                  final String edgeId2,
                                                   final BlockManagerWorker 
sender,
                                                   final BlockManagerWorker 
receiver) {
      serializerManagers.get(sender).register(edgeId, CODER, new 
ExecutionPropertyMap(""));
diff --cc 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
index 6d34f18,2e3bf30..cac9e61
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
@@@ -27,9 -27,9 +27,8 @@@ import org.apache.reef.tang.Tang
  import org.junit.Before;
  import org.junit.Test;
  
- import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.ExecutionException;
- 
+ import java.util.concurrent.Future;
 -
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to