This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 2e97984c62fc519466b40d64fa82929c51d444a7 Merge: 13080f3 ee830d5 Author: Jeongyoon Eo <[email protected]> AuthorDate: Sun Mar 4 14:06:42 2018 +0900 merge master bin/generate_javadocs.sh | 2 +- .../DuplicateEdgeGroupProperty.java | 41 ++ .../DuplicateEdgeGroupPropertyValue.java | 99 ++++ .../ir/executionproperty/ExecutionProperty.java | 2 +- .../edu/snu/nemo/common/ir/vertex/LoopVertex.java | 14 + .../edu/snu/nemo/common/test/ExampleTestUtil.java | 50 ++ .../beam/transform/GroupByKeyTransform.java | 1 - .../nemo/compiler/frontend/spark/sql/Dataset.java | 12 +- .../compiler/frontend/spark/sql/SparkSession.java | 16 +- .../frontend/spark/transform/CollectTransform.java | 1 - .../spark/transform/ReduceByKeyTransform.java | 18 +- .../annotating/DuplicateEdgeGroupSizePass.java | 66 +++ .../composite/LoopOptimizationCompositePass.java | 4 +- .../compiletime/reshaping/LoopUnrollingPass.java | 1 + conf/src/main/java/edu/snu/nemo/conf/JobConf.java | 2 +- .../nemo/examples/beam/AlternatingLeastSquare.java | 27 +- .../beam/AlternatingLeastSquareITCase.java | 38 +- .../snu/nemo/examples/beam/BroadcastITCase.java | 5 +- .../snu/nemo/examples/beam/MapReduceITCase.java | 2 +- .../beam/MultinomialLogisticRegressionITCase.java | 2 +- examples/resources/sample_input_mr | 2 +- examples/resources/test_output_als | 627 +++++++++++++++++++++ .../edu/snu/nemo/examples/spark/SparkITCase.java | 15 +- .../pass/runtime/DataSkewRuntimePass.java | 2 +- .../plan/physical/PhysicalPlanGenerator.java | 35 ++ .../nemo/runtime/executor/TaskGroupExecutor.java | 2 +- .../runtime/executor/TaskGroupStateManager.java | 68 +-- .../runtime/executor/data/BlockManagerWorker.java | 50 +- .../snu/nemo/runtime/executor/data/DataUtil.java | 28 +- .../runtime/executor/datatransfer/InputReader.java | 24 +- .../executor/datatransfer/OutputWriter.java | 21 +- .../nemo/runtime/master/BlockManagerMaster.java | 144 +++-- .../edu/snu/nemo/runtime/master/BlockMetadata.java | 29 +- .../optimizer/policy/PolicyBuilderTest.java | 6 +- .../runtime/executor/TaskGroupExecutorTest.java | 15 +- .../executor/datatransfer/DataTransferTest.java | 183 ++++++ .../runtime/master/BlockManagerMasterTest.java | 32 +- 37 files changed, 1467 insertions(+), 219 deletions(-) diff --cc common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java index 075ef66,eeadd66..ef55daa --- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java @@@ -79,6 -80,6 +80,5 @@@ public abstract class ExecutionProperty Parallelism, ScheduleGroupIndex, StageId, -- } } diff --cc common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java index 1b4b8b8,eeec1a9..61b42ca --- a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java +++ b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java @@@ -19,8 -19,11 +19,11 @@@ import java.io.IOException import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; ++import java.util.Arrays; ++import java.util.List; + import java.util.stream.IntStream; /** * Test Utils for Examples. diff --cc compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java index 7c6d00f,25e3fb9..0ea183d --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java @@@ -28,10 -26,8 +28,9 @@@ import java.util.* * @param <I> input type. */ public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, List>> { + private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyTransform.class.getName()); - private final Map<Object, List> keyToValues; - private OutputCollector<KV<Object, List>> outputCollector; + private Pipe<KV<Object, List>> pipe; /** * GroupByKey constructor. diff --cc compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java index bc6c486,72d2f49..ef25601 --- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java +++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java @@@ -15,9 -15,9 +15,11 @@@ */ package edu.snu.nemo.compiler.frontend.spark.transform; -import edu.snu.nemo.common.ir.OutputCollector; +import edu.snu.nemo.common.ir.Pipe; import edu.snu.nemo.common.ir.vertex.transform.Transform; import org.apache.spark.api.java.function.Function2; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; import scala.Tuple2; import java.util.*; @@@ -28,9 -28,9 +30,11 @@@ * @param <V> value type. */ public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K, V>, Tuple2<K, V>> { ++ private static final Logger LOG = LoggerFactory.getLogger(ReduceByKeyTransform.class.getName()); ++ private final Map<K, List<V>> keyToValues; private final Function2<V, V, V> func; - private OutputCollector<Tuple2<K, V>> oc; + private Pipe<Tuple2<K, V>> pipe; /** * Constructor. @@@ -57,10 -56,10 +61,14 @@@ @Override public void close() { -- keyToValues.entrySet().stream().map(entry -> { -- final V value = ReduceTransform.reduceIterator(entry.getValue().iterator(), func); -- return new Tuple2<>(entry.getKey(), value); - }).forEach(pipe::emit); - }).forEach(oc::emit); -- keyToValues.clear(); ++ if (keyToValues.isEmpty()) { ++ LOG.warn("Spark ReduceByKeyTransform received no data!"); ++ } else { ++ keyToValues.entrySet().stream().map(entry -> { ++ final V value = ReduceTransform.reduceIterator(entry.getValue().iterator(), func); ++ return new Tuple2<>(entry.getKey(), value); ++ }).forEach(pipe::emit); ++ keyToValues.clear(); ++ } } } diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java index e341604,5585cd7..2d96b6e --- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java +++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java @@@ -15,10 -15,12 +15,12 @@@ */ package edu.snu.nemo.examples.beam; - import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive; import edu.snu.nemo.client.JobLauncher; import edu.snu.nemo.common.test.ArgBuilder; - + import edu.snu.nemo.common.test.ExampleTestUtil; + import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive; -import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelsimFive; ++import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive; + import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java index 2770bf2,5dc7a11..b918960 --- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java +++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java @@@ -20,7 -19,7 +19,7 @@@ import edu.snu.nemo.client.JobLauncher import edu.snu.nemo.common.test.ArgBuilder; import edu.snu.nemo.common.test.ExampleTestUtil; import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive; - -import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelsimFive; ++import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java index 85d58ee,89284e2..cbc12e9 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java @@@ -775,4 -516,4 +775,4 @@@ public final class TaskGroupExecutor } } } --} ++} diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java index 707dca0,3a1fe7f..e35eadc --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java @@@ -18,8 -18,9 +18,7 @@@ package edu.snu.nemo.runtime.executor import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.exception.UnknownExecutionStateException; import edu.snu.nemo.common.exception.UnknownFailureCauseException; -import edu.snu.nemo.common.StateMachine; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; --import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageEnvironment; import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap; import edu.snu.nemo.runtime.common.plan.RuntimeEdge; @@@ -45,7 -47,20 +44,6 @@@ public final class TaskGroupStateManage private final int attemptIdx; private final String executorId; private final MetricCollector metricCollector; - - /** - * Used to track all task states of this task group, by keeping a map of logical task ids to their states. - */ - private final Map<String, TaskState> logicalIdToTaskStates; - - /** - * Used to track task group completion status. - * All task ids are added to the set when the this task group begins executing. - * Each task id is removed upon completion, - * therefore indicating the task group's completion when this set becomes empty. - */ - private Set<String> currentTaskGroupTaskIds; -- private final PersistentConnectionToMasterMap persistentConnectionToMasterMap; public TaskGroupStateManager(final ScheduledTaskGroup scheduledTaskGroup, diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java index 7898cf3,61dfeba..d80a8c9 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java @@@ -46,8 -45,6 +47,7 @@@ import org.slf4j.LoggerFactory * TODO #492: Modularize the data communication pattern. */ public final class InputReader extends DataTransfer { + private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName()); - private final int dstTaskIndex; private final BlockManagerWorker blockManagerWorker; diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java index ef5c976,f28d893..31ed98f --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java @@@ -368,4 -357,82 +357,83 @@@ public final class BlockManagerMaster } } } + + /** + * The handler of block location requests. + */ + @VisibleForTesting + public static final class BlockLocationRequestHandler { + private final String blockId; + private final CompletableFuture<String> locationFuture; + + /** + * Constructor. + * + * @param blockId the ID of the block. + */ + BlockLocationRequestHandler(final String blockId) { + this.blockId = blockId; + this.locationFuture = new CompletableFuture<>(); + } + + /** + * Completes the block location future. + * If there is any pending request, replies with the completed location. + * + * @param location the location of the block. + */ + void complete(final String location) { + locationFuture.complete(location); + } + + /** + * Completes the block location future with failure. + * If there is any pending request, replies with the cause. + * + * @param throwable the cause of failure. + */ + void completeExceptionally(final Throwable throwable) { + locationFuture.completeExceptionally(throwable); + } + + /** + * Registers a request for the block location. + * If the location is already known, reply the location instantly. + * + * @param requestId the ID of the block location request. + * @param messageContext the message context to reply. + */ + void registerRequest(final long requestId, + final MessageContext messageContext) { + final ControlMessage.BlockLocationInfoMsg.Builder infoMsgBuilder = + ControlMessage.BlockLocationInfoMsg.newBuilder() + .setRequestId(requestId) + .setBlockId(blockId); + + locationFuture.whenComplete((location, throwable) -> { + if (throwable == null) { + infoMsgBuilder.setOwnerExecutorId(location); + } else { + infoMsgBuilder.setState( + convertBlockState(((AbsentBlockException) throwable).getState())); + } + messageContext.reply( + ControlMessage.Message.newBuilder() + .setId(RuntimeIdGenerator.generateMessageId()) + .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID) + .setType(ControlMessage.MessageType.BlockLocationInfo) + .setBlockLocationInfoMsg(infoMsgBuilder.build()) + .build()); + }); + } + + /** + * @return the future of the block location. + */ + @VisibleForTesting + public Future<String> getLocationFuture() { + return locationFuture; + } + } ++>>>>>>> master } diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java index a7b810c,aae7b40..1abb2c8 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java @@@ -30,11 -29,10 +29,10 @@@ import javax.annotation.concurrent.Thre */ @ThreadSafe final class BlockMetadata { - // Partition level metadata. -- private static final Logger LOG = LoggerFactory.getLogger(BlockManagerMaster.class.getName()); ++ private static final Logger LOG = LoggerFactory.getLogger(BlockMetadata.class.getName()); private final String blockId; private final BlockState blockState; - private volatile CompletableFuture<String> locationFuture; // the future of the location of this block. + private volatile BlockManagerMaster.BlockLocationRequestHandler locationHandler; /** * Constructs the metadata for a block. diff --cc tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java index 38f52f0,0209670..3f72a1d --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java @@@ -27,7 -27,7 +27,6 @@@ import edu.snu.nemo.common.ir.vertex.IR import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.plan.RuntimeEdge; import edu.snu.nemo.runtime.common.plan.physical.*; --import edu.snu.nemo.runtime.common.state.TaskState; import edu.snu.nemo.runtime.executor.MetricMessageSender; import edu.snu.nemo.runtime.executor.TaskGroupExecutor; import edu.snu.nemo.runtime.executor.TaskGroupStateManager; diff --cc tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java index 5ef643d,b536372..7834620 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java @@@ -336,7 -373,149 +373,151 @@@ public final class DataTransferTest } } + private void writeAndReadWithDuplicateData(final BlockManagerWorker sender, + final BlockManagerWorker receiver, + final DataCommunicationPatternProperty.Value commPattern, + final DataStoreProperty.Value store) throws RuntimeException { + final int testIndex = TEST_INDEX.getAndIncrement(); + final int testIndex2 = TEST_INDEX.getAndIncrement(); + final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex); + final String edgeId2 = String.format(EDGE_PREFIX_TEMPLATE, testIndex2); + final Pair<IRVertex, IRVertex> verticesPair = setupVertices(edgeId, edgeId2, sender, receiver); + final IRVertex srcVertex = verticesPair.left(); + final IRVertex dstVertex = verticesPair.right(); + + // Edge setup + final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER); + dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element))); + final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties(); + edgeProperties.put(DataCommunicationPatternProperty.of(commPattern)); + edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner)); + edgeProperties.put(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy"))); + final DuplicateEdgeGroupPropertyValue duplicateDataProperty = edgeProperties.get(ExecutionProperty.Key.DuplicateEdgeGroup); + duplicateDataProperty.setRepresentativeEdgeId(edgeId); + duplicateDataProperty.setGroupSize(2); + + edgeProperties.put(DataStoreProperty.of(store)); + edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep)); + final RuntimeEdge dummyEdge, dummyEdge2; + + final IRVertex srcMockVertex = mock(IRVertex.class); + final IRVertex dstMockVertex = mock(IRVertex.class); + final PhysicalStage srcStage = setupStages("srcStage-" + testIndex); + final PhysicalStage dstStage = setupStages("dstStage-" + testIndex); + dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex, + srcStage, dstStage, CODER, false); + final IRVertex dstMockVertex2 = mock(IRVertex.class); + final PhysicalStage dstStage2 = setupStages("dstStage-" + testIndex2); + dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2, + srcStage, dstStage2, CODER, false); + // Initialize states in Master + srcStage.getTaskGroupIds().forEach(srcTaskGroupId -> { + final String blockId = RuntimeIdGenerator.generateBlockId( + edgeId, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId)); + master.initializeState(blockId, srcTaskGroupId); + final String blockId2 = RuntimeIdGenerator.generateBlockId( + edgeId2, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId)); + master.initializeState(blockId2, srcTaskGroupId); + master.onProducerTaskGroupScheduled(srcTaskGroupId); + }); + + // Write + final List<List> dataWrittenList = new ArrayList<>(); + IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> { + final List dataWritten = getRangedNumList(0, PARALLELISM_TEN); + final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex, + dummyEdge, sender); - writer.write(dataWritten); ++ dataWritten.iterator().forEachRemaining(writer::writeElement); ++ writer.write(); + writer.close(); + dataWrittenList.add(dataWritten); + + final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex, + dummyEdge2, sender); - writer2.write(dataWritten); ++ dataWritten.iterator().forEachRemaining(writer::writeElement); ++ writer2.write(); + writer2.close(); + }); + + // Read + final List<List> dataReadList = new ArrayList<>(); + final List<List> dataReadList2 = new ArrayList<>(); + IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> { + final InputReader reader = + new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver); + final InputReader reader2 = + new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver); + + if (DataCommunicationPatternProperty.Value.OneToOne.equals(commPattern)) { + assertEquals(1, reader.getSourceParallelism()); + } else { + assertEquals(PARALLELISM_TEN, reader.getSourceParallelism()); + } + + if (DataCommunicationPatternProperty.Value.OneToOne.equals(commPattern)) { + assertEquals(1, reader2.getSourceParallelism()); + } else { + assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism()); + } + + final List dataRead = new ArrayList<>(); + try { + InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add); + } catch (final Exception e) { + throw new RuntimeException(e); + } + dataReadList.add(dataRead); + + final List dataRead2 = new ArrayList<>(); + try { + InputReader.combineFutures(reader2.read()).forEachRemaining(dataRead2::add); + } catch (final Exception e) { + throw new RuntimeException(e); + } + dataReadList2.add(dataRead2); + }); + + // Compare (should be the same) + final List flattenedWrittenData = flatten(dataWrittenList); + final List flattenedWrittenData2 = flatten(dataWrittenList); + final List flattenedReadData = flatten(dataReadList); + final List flattenedReadData2 = flatten(dataReadList2); + if (DataCommunicationPatternProperty.Value.BroadCast.equals(commPattern)) { + final List broadcastedWrittenData = new ArrayList<>(); + final List broadcastedWrittenData2 = new ArrayList<>(); + IntStream.range(0, PARALLELISM_TEN).forEach(i -> broadcastedWrittenData.addAll(flattenedWrittenData)); + IntStream.range(0, PARALLELISM_TEN).forEach(i -> broadcastedWrittenData2.addAll(flattenedWrittenData2)); + assertEquals(broadcastedWrittenData.size(), flattenedReadData.size()); + flattenedReadData.forEach(rData -> assertTrue(broadcastedWrittenData.remove(rData))); + flattenedReadData2.forEach(rData -> assertTrue(broadcastedWrittenData2.remove(rData))); + } else { + assertEquals(flattenedWrittenData.size(), flattenedReadData.size()); + flattenedReadData.forEach(rData -> assertTrue(flattenedWrittenData.remove(rData))); + flattenedReadData2.forEach(rData -> assertTrue(flattenedWrittenData2.remove(rData))); + } + } + + private Pair<IRVertex, IRVertex> setupVertices(final String edgeId, + final BlockManagerWorker sender, + final BlockManagerWorker receiver) { + serializerManagers.get(sender).register(edgeId, CODER, new ExecutionPropertyMap("")); + serializerManagers.get(receiver).register(edgeId, CODER, new ExecutionPropertyMap("")); + + // Src setup + final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source"); + final ExecutionPropertyMap srcVertexProperties = srcVertex.getExecutionProperties(); + srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN)); + + // Dst setup + final SourceVertex dstVertex = new EmptyComponents.EmptySourceVertex("Destination"); + final ExecutionPropertyMap dstVertexProperties = dstVertex.getExecutionProperties(); + dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN)); + + return Pair.of(srcVertex, dstVertex); + } + private Pair<IRVertex, IRVertex> setupVertices(final String edgeId, + final String edgeId2, final BlockManagerWorker sender, final BlockManagerWorker receiver) { serializerManagers.get(sender).register(edgeId, CODER, new ExecutionPropertyMap("")); diff --cc tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java index 6d34f18,2e3bf30..cac9e61 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java @@@ -27,9 -27,9 +27,8 @@@ import org.apache.reef.tang.Tang import org.junit.Before; import org.junit.Test; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - + import java.util.concurrent.Future; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -- To stop receiving notification emails like this one, please contact [email protected].
