johnyangk closed pull request #100: [NEMO-178] Compile-time task cloning
URL: https://github.com/apache/incubator-nemo/pull/100
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java 
b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
index 744a992f5..d4892706f 100644
--- a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
@@ -71,7 +71,7 @@ public void testState() throws Exception {
 
     // Check finish.
     final List<String> tasks = 
physicalPlan.getStageDAG().getTopologicalSort().stream()
-        .flatMap(stage -> stage.getTaskIds().stream())
+        .flatMap(stage -> stage.getAllPossiblyClonedTaskIdsShuffled().stream())
         .collect(Collectors.toList());
     tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
     tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/IdManager.java 
b/common/src/main/java/edu/snu/nemo/common/ir/IdManager.java
index 934d04820..3a183aea3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/IdManager.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/IdManager.java
@@ -35,13 +35,13 @@ private IdManager() {
    * @return a new operator ID.
    */
   public static String newVertexId() {
-    return "vertex" + (isDriver ? "-d" : "-") + vertexId.getAndIncrement();
+    return "vertex" + (isDriver ? "(d)" : "") + vertexId.getAndIncrement();
   }
   /**
    * @return a new edge ID.
    */
   public static String newEdgeId() {
-    return "edge" + (isDriver ? "-d" : "-") + edgeId.getAndIncrement();
+    return "edge" + (isDriver ? "(d)" : "") + edgeId.getAndIncrement();
   }
 
   /**
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
new file mode 100644
index 000000000..b5c3da091
--- /dev/null
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * Specifies cloned execution of a vertex.
+ *
+ * Limitations of the current runtime implementation. (will be addressed in 
the future)
+ * (1) A vertex cannot have a larger # of clones than its parent vertex.
+ * (2) *ALL* of the clones are always scheduled, and the vertex is considered 
complete after all of the clones finish.
+ */
+public final class ClonedSchedulingProperty extends 
VertexExecutionProperty<Integer> {
+  /**
+   * Constructor.
+   * @param value value of the execution property.
+   */
+  private ClonedSchedulingProperty(final Integer value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static ClonedSchedulingProperty of(final Integer value) {
+    if (value <= 0) {
+      throw new IllegalStateException(String.valueOf(value));
+    }
+    return new ClonedSchedulingProperty(value);
+  }
+}
diff --git 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 3e933a2e5..696acffd3 100644
--- 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -19,7 +19,7 @@
 import edu.snu.nemo.compiler.backend.Backend;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
 import edu.snu.nemo.runtime.common.plan.Stage;
@@ -61,6 +61,6 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> 
irDAG) throws Exception
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator 
physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
-    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), 
stageDAG);
+    return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), 
stageDAG);
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
new file mode 100644
index 000000000..e7bf5b0ae
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ClonedSchedulingPass.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
+
+import java.util.Collections;
+
+/**
+ * Set the ClonedScheduling property of all vertices to 2.
+ */
+public final class ClonedSchedulingPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public ClonedSchedulingPass() {
+    super(ClonedSchedulingProperty.class, Collections.emptySet());
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().stream()
+        .filter(vertex -> !dag.getOutgoingEdgesOf(vertex.getId()).isEmpty()) 
// Don't clone sink vertices
+        .forEach(vertex -> vertex.setProperty(ClonedSchedulingProperty.of(2)));
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 1aa4dcf3a..0644a161f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -271,7 +271,7 @@ private int getNextScheudleGroup(final Collection<IRVertex> 
irVertexCollection)
      * @param dst destination vertex.
      */
     ScheduleGroupEdge(final ScheduleGroup src, final ScheduleGroup dst) {
-      super(String.format("ScheduleGroupEdge-%d", nextScheduleGroupEdgeId++), 
src, dst);
+      super(String.format("ScheduleGroupEdge%d", nextScheduleGroupEdgeId++), 
src, dst);
     }
 
     @Override
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
index 52cc00675..38d5a2089 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -99,8 +99,8 @@ public void testSimplePlan() throws Exception {
     assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage1).size(), 1);
     assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage2).size(), 0);
 
-    assertEquals(physicalStage1.getTaskIds().size(), 3);
-    assertEquals(physicalStage2.getTaskIds().size(), 2);
+    assertEquals(physicalStage1.getAllPossiblyClonedTaskIdsShuffled().size(), 
3);
+    assertEquals(physicalStage2.getAllPossiblyClonedTaskIdsShuffled().size(), 
2);
   }
 
   @Test
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 7d291a923..b85dcd9bb 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -70,6 +70,15 @@ public void test() throws Exception {
         .build());
   }
 
+  @Test (timeout = TIMEOUT)
+  public void testClonedScheduling() throws Exception {
+    JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
+        .addJobId(WordCountITCase.class.getSimpleName() + "_clonedscheduling")
+        
.addOptimizationPolicy(ClonedSchedulingPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+
   @Test (timeout = TIMEOUT)
   public void testLargeShuffle() throws Exception {
     JobLauncher.main(builder
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
new file mode 100644
index 000000000..6b2210d0a
--- /dev/null
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/ClonedSchedulingPolicyParallelismFive.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam.policy;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ClonedSchedulingPass;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.compiler.optimizer.policy.PolicyImpl;
+import org.apache.reef.tang.Injector;
+
+import java.util.List;
+
+/**
+ * A default policy with cloning for tests.
+ */
+public final class ClonedSchedulingPolicyParallelismFive implements Policy {
+  private final Policy policy;
+
+  public ClonedSchedulingPolicyParallelismFive() {
+    final List<CompileTimePass> overwritingPasses = 
DefaultPolicy.BUILDER.getCompileTimePasses();
+    overwritingPasses.add(new ClonedSchedulingPass()); // CLONING!
+
+    this.policy = new PolicyImpl(
+        PolicyTestUtil.overwriteParallelism(5, overwritingPasses),
+        DefaultPolicy.BUILDER.getRuntimePasses());
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, 
IREdge> dag, final String dagDirectory)
+      throws Exception {
+    return this.policy.runCompileTimeOptimization(dag, dagDirectory);
+  }
+
+  @Override
+  public void registerRunTimeOptimizations(final Injector injector, final 
PubSubEventHandlerWrapper pubSubWrapper) {
+    this.policy.registerRunTimeOptimizations(injector, pubSubWrapper);
+  }
+}
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
similarity index 70%
rename from 
runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
rename to 
runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
index 2e2e61619..b25bd06d7 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdManager.java
@@ -21,19 +21,17 @@
 /**
  * ID Generator.
  */
-public final class RuntimeIdGenerator {
+public final class RuntimeIdManager {
   private static AtomicInteger physicalPlanIdGenerator = new AtomicInteger(0);
   private static AtomicInteger executorIdGenerator = new AtomicInteger(0);
   private static AtomicLong messageIdGenerator = new AtomicLong(1L);
   private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
-  private static final String BLOCK_PREFIX = "Block-";
-  private static final String BLOCK_ID_SPLITTER = "_";
-  private static final String TASK_INFIX = "-Task-";
+  private static final String SPLITTER = "-";
 
   /**
    * Private constructor which will not be used.
    */
-  private RuntimeIdGenerator() {
+  private RuntimeIdManager() {
   }
 
 
@@ -45,7 +43,7 @@ private RuntimeIdGenerator() {
    * @return the generated ID
    */
   public static String generatePhysicalPlanId() {
-    return "Plan-" + physicalPlanIdGenerator.getAndIncrement();
+    return "Plan" + physicalPlanIdGenerator.getAndIncrement();
   }
 
   /**
@@ -55,7 +53,7 @@ public static String generatePhysicalPlanId() {
    * @return the generated ID
    */
   public static String generateStageEdgeId(final String irEdgeId) {
-    return "SEdge-" + irEdgeId;
+    return "SEdge" + irEdgeId;
   }
 
   /**
@@ -64,18 +62,22 @@ public static String generateStageEdgeId(final String 
irEdgeId) {
    * @return the generated ID
    */
   public static String generateStageId(final Integer stageId) {
-    return "Stage-" + stageId;
+    return "Stage" + stageId;
   }
 
   /**
    * Generates the ID for a task.
    *
-   * @param index   the index of this task.
    * @param stageId the ID of the stage.
+   * @param index   the index of this task.
+   * @param cloneOffset a positive number if this task is a clone of an 
original task.
    * @return the generated ID
    */
-  public static String generateTaskId(final int index, final String stageId) {
-    return stageId + TASK_INFIX + index;
+  public static String generateTaskId(final String stageId, final int index, 
final int cloneOffset) {
+    if (cloneOffset < 0) {
+      throw new IllegalStateException(String.valueOf(cloneOffset));
+    }
+    return stageId + SPLITTER + index + SPLITTER + cloneOffset;
   }
 
   /**
@@ -84,7 +86,7 @@ public static String generateTaskId(final int index, final 
String stageId) {
    * @return the generated ID
    */
   public static String generateExecutorId() {
-    return "Executor-" + executorIdGenerator.getAndIncrement();
+    return "Executor" + executorIdGenerator.getAndIncrement();
   }
 
   /**
@@ -92,11 +94,16 @@ public static String generateExecutorId() {
    *
    * @param runtimeEdgeId of the block
    * @param producerTaskIndex of the block
+   * @param producerTaskCloneOffset if the producer task is a clone.
    * @return the generated ID
    */
   public static String generateBlockId(final String runtimeEdgeId,
-                                       final int producerTaskIndex) {
-    return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + 
producerTaskIndex;
+                                       final int producerTaskIndex,
+                                       final int producerTaskCloneOffset) {
+    if (producerTaskCloneOffset < 0) {
+      throw new IllegalStateException(String.valueOf(producerTaskCloneOffset));
+    }
+    return runtimeEdgeId + SPLITTER + producerTaskIndex + SPLITTER + 
producerTaskCloneOffset;
   }
 
   /**
@@ -114,7 +121,7 @@ public static long generateMessageId() {
    * @return the generated ID
    */
   public static String generateResourceSpecId() {
-    return "ResourceSpec-" + resourceSpecIdGenerator.getAndIncrement();
+    return "ResourceSpec" + resourceSpecIdGenerator.getAndIncrement();
   }
 
   //////////////////////////////////////////////////////////////// Parse IDs
@@ -126,7 +133,7 @@ public static String generateResourceSpecId() {
    * @return the runtime edge ID.
    */
   public static String getRuntimeEdgeIdFromBlockId(final String blockId) {
-    return parseBlockId(blockId)[0];
+    return split(blockId)[0];
   }
 
   /**
@@ -136,19 +143,7 @@ public static String getRuntimeEdgeIdFromBlockId(final 
String blockId) {
    * @return the task index.
    */
   public static String getTaskIndexFromBlockId(final String blockId) {
-    return parseBlockId(blockId)[1];
-  }
-
-  /**
-   * Parses a block id.
-   * The result array will contain runtime edge id and task index in order.
-   *
-   * @param blockId to parse.
-   * @return the array of parsed information.
-   */
-  private static String[] parseBlockId(final String blockId) {
-    final String woPrefix = blockId.split(BLOCK_PREFIX)[1];
-    return woPrefix.split(BLOCK_ID_SPLITTER);
+    return split(blockId)[1];
   }
 
   /**
@@ -158,7 +153,7 @@ public static String getTaskIndexFromBlockId(final String 
blockId) {
    * @return the stage ID.
    */
   public static String getStageIdFromTaskId(final String taskId) {
-    return parseTaskId(taskId)[0];
+    return split(taskId)[0];
   }
 
   /**
@@ -168,17 +163,20 @@ public static String getStageIdFromTaskId(final String 
taskId) {
    * @return the index.
    */
   public static int getIndexFromTaskId(final String taskId) {
-    return Integer.valueOf(parseTaskId(taskId)[1]);
+    return Integer.valueOf(split(taskId)[1]);
   }
 
   /**
-   * Parses a task id.
-   * The result array will contain the stage id and the index of the task in 
order.
+   * Extracts the clone offset from a task ID.
    *
-   * @param taskId to parse.
-   * @return the array of parsed information.
+   * @param taskId the task ID to extract.
+   * @return the clone offset.
    */
-  private static String[] parseTaskId(final String taskId) {
-    return taskId.split(TASK_INFIX);
+  public static int getCloneOffsetFromTaskId(final String taskId) {
+    return Integer.valueOf(split(taskId)[2]);
+  }
+
+  private static String[] split(final String id) {
+    return id.split(SPLITTER);
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 3218317dc..26a10b3a4 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -22,7 +22,7 @@
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 import edu.snu.nemo.common.exception.DynamicOptimizationException;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
@@ -76,7 +76,7 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
 
     // get edges to optimize
     final List<String> optimizationEdgeIds = blockIds.stream().map(blockId ->
-        
RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
+        
RuntimeIdManager.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
     final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
     final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
         .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
@@ -84,17 +84,17 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
         .collect(Collectors.toList());
 
     // Get number of evaluators of the next stage (number of blocks).
-    final Integer numOfDstTasks = 
optimizationEdges.stream().findFirst().orElseThrow(() ->
-        new RuntimeException("optimization edges are 
empty")).getDst().getTaskIds().size();
+    final Integer numOfOriginalDstTasks = 
optimizationEdges.stream().findFirst().orElseThrow(() ->
+        new RuntimeException("optimization edges are 
empty")).getDst().getOriginalTaskIdsSortedByIndex().size();
 
     // Calculate keyRanges.
-    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), 
numOfDstTasks);
+    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), 
numOfOriginalDstTasks);
 
     // Overwrite the previously assigned key range in the physical DAG with 
the new range.
     optimizationEdges.forEach(optimizationEdge -> {
       // Update the information.
       final Map<Integer, KeyRange> taskIdxToHashRange = new HashMap<>();
-      for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) {
+      for (int taskIdx = 0; taskIdx < numOfOriginalDstTasks; taskIdx++) {
         taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx));
       }
       optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index ca4124bad..b9e87e720 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -31,7 +31,7 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.reef.tang.annotations.Parameter;
 
@@ -139,7 +139,7 @@ private void handleDuplicateEdgeGroupProperty(final 
DAG<Stage, StageEdge> dagOfS
 
     for (final int stageId : vertexSetForEachStage.keySet()) {
       final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId);
-      final String stageIdentifier = 
RuntimeIdGenerator.generateStageId(stageId);
+      final String stageIdentifier = RuntimeIdManager.generateStageId(stageId);
       final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = 
new ExecutionPropertyMap<>(stageIdentifier);
       
stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put);
       final int stageParallelism = 
stageProperties.get(ParallelismProperty.class)
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index c2abcc4ee..b9167e520 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -21,16 +21,14 @@
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import org.apache.commons.lang3.SerializationUtils;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * Stage.
@@ -75,12 +73,29 @@ public Stage(final String stageId,
   }
 
   /**
-   * @return the list of the task IDs in this stage.
+   * @return (including clones) all task IDs in this stage shuffled.
    */
-  public List<String> getTaskIds() {
+  public List<String> getAllPossiblyClonedTaskIdsShuffled() {
+    // Get possibly cloned task ids.
+    final int cloneNum = 
executionProperties.get(ClonedSchedulingProperty.class).orElse(1);
     final List<String> taskIds = new ArrayList<>();
     for (int taskIdx = 0; taskIdx < getParallelism(); taskIdx++) {
-      taskIds.add(RuntimeIdGenerator.generateTaskId(taskIdx, getId()));
+      for (int cloneOffset = 0; cloneOffset < cloneNum; cloneOffset++) {
+        taskIds.add(RuntimeIdManager.generateTaskId(getId(), taskIdx, 
cloneOffset));
+      }
+    }
+    Collections.shuffle(taskIds); // Shuffle to avoid always scheduling clones 
back-to-back.
+    return taskIds;
+  }
+
+  /**
+   * @return (excluding clones) original task ids sorted.
+   */
+  public List<String> getOriginalTaskIdsSortedByIndex() {
+    final List<String> taskIds = new ArrayList<>();
+    for (int taskIdx = 0; taskIdx < getParallelism(); taskIdx++) {
+      final int cloneOffSetOfOriginalTask = 0;
+      taskIds.add(RuntimeIdManager.generateTaskId(getId(), taskIdx, 
cloneOffSetOfOriginalTask));
     }
     return taskIds;
   }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 98773eacb..a537e5c49 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -72,18 +72,18 @@
    */
   @VisibleForTesting
   public StageEdge(final String runtimeEdgeId,
-            final ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties,
-            final IRVertex srcVertex,
-            final IRVertex dstVertex,
-            final Stage srcStage,
-            final Stage dstStage,
-            final Boolean isSideInput) {
+                   final ExecutionPropertyMap<EdgeExecutionProperty> 
edgeProperties,
+                   final IRVertex srcVertex,
+                   final IRVertex dstVertex,
+                   final Stage srcStage,
+                   final Stage dstStage,
+                   final Boolean isSideInput) {
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
     // Initialize the key range of each dst task.
     this.taskIdxToKeyRange = new HashMap<>();
-    for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
+    for (int taskIdx = 0; taskIdx < 
dstStage.getOriginalTaskIdsSortedByIndex().size(); taskIdx++) {
       taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, 
false));
     }
     this.dataCommunicationPatternValue = 
edgeProperties.get(CommunicationPatternProperty.class)
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 14bffb52f..9677cac30 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.ir.IdManager;
 import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ResourceSitePass;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageParameters;
 import edu.snu.nemo.runtime.master.ClientRPC;
@@ -146,7 +146,7 @@ public void onNext(final StartTime startTime) {
   public final class AllocatedEvaluatorHandler implements 
EventHandler<AllocatedEvaluator> {
     @Override
     public void onNext(final AllocatedEvaluator allocatedEvaluator) {
-      final String executorId = RuntimeIdGenerator.generateExecutorId();
+      final String executorId = RuntimeIdManager.generateExecutorId();
       runtimeMaster.onContainerAllocated(executorId, allocatedEvaluator,
           getExecutorConfiguration(executorId));
     }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 52e153e08..b4bdcd73d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -25,7 +25,7 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -135,7 +135,7 @@ private void launchTask(final Task task) {
     } catch (final Exception e) {
       
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdGenerator.generateMessageId())
+              .setId(RuntimeIdManager.generateMessageId())
               
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.ExecutorFailed)
               
.setExecutorFailedMsg(ControlMessage.ExecutorFailedMsg.newBuilder()
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index d8f3b4151..8e51cdaba 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
@@ -57,7 +57,7 @@ public void flush() {
     flushMetricMessageQueueToMaster();
     
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
         ControlMessage.Message.newBuilder()
-            .setId(RuntimeIdGenerator.generateMessageId())
+            .setId(RuntimeIdManager.generateMessageId())
             
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
             .setType(ControlMessage.MessageType.MetricFlushed)
             .build());
@@ -79,7 +79,7 @@ private synchronized void flushMetricMessageQueueToMaster() {
 
       
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdGenerator.generateMessageId())
+              .setId(RuntimeIdManager.generateMessageId())
               
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.MetricMessageReceived)
               .setMetricMsg(metricMsgBuilder.build())
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index a707b0321..f82fd115b 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -126,7 +126,7 @@ private void notifyTaskStateToMaster(final TaskState.State 
newState,
     // Send taskStateChangedMsg to master!
     
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
         ControlMessage.Message.newBuilder()
-            .setId(RuntimeIdGenerator.generateMessageId())
+            .setId(RuntimeIdManager.generateMessageId())
             
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
             .setType(ControlMessage.MessageType.TaskStateChanged)
             .setTaskStateChangedMsg(msgBuilder.build())
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ba0a71cf4..067159139 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -27,7 +27,7 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import 
edu.snu.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.executor.bytetransfer.ByteInputContext;
@@ -160,7 +160,7 @@ public Block createBlock(final String blockId,
           final CompletableFuture<ControlMessage.Message> 
responseFromMasterFuture = persistentConnectionToMasterMap
               
.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
                   ControlMessage.Message.newBuilder()
-                      .setId(RuntimeIdGenerator.generateMessageId())
+                      .setId(RuntimeIdManager.generateMessageId())
                       
.setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
                       .setType(ControlMessage.MessageType.RequestBlockLocation)
                       .setRequestBlockLocationMsg(
@@ -273,7 +273,7 @@ public void writeBlock(final Block block,
 
     
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
         .send(ControlMessage.Message.newBuilder()
-            .setId(RuntimeIdGenerator.generateMessageId())
+            .setId(RuntimeIdManager.generateMessageId())
             
.setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
             .setType(ControlMessage.MessageType.BlockStateChanged)
             .setBlockStateChangedMsg(blockStateChangedMsgBuilder.build())
@@ -292,7 +292,7 @@ public void writeBlock(final Block block,
       // TODO #4: Refactor metric aggregation for (general) run-rime 
optimization.
       
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
           .send(ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdGenerator.generateMessageId())
+              .setId(RuntimeIdManager.generateMessageId())
               
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.DataSizeMetric)
               
.setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
@@ -331,7 +331,7 @@ public void removeBlock(final String blockId,
 
       
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
           .send(ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdGenerator.generateMessageId())
+              .setId(RuntimeIdManager.generateMessageId())
               
.setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.BlockStateChanged)
               .setBlockStateChangedMsg(blockStateChangedMsgBuilder)
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index 761dbedf9..32d46154e 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.data.stores;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
@@ -41,7 +41,7 @@ protected AbstractBlockStore(final SerializerManager 
serializerManager) {
    * @return the coder.
    */
   protected final Serializer getSerializerFromWorker(final String blockId) {
-    final String runtimeEdgeId = 
RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId);
+    final String runtimeEdgeId = 
RuntimeIdManager.getRuntimeEdgeIdFromBlockId(blockId);
     return serializerManager.getSerializer(runtimeEdgeId);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 36db3ec1c..059192fb9 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -43,15 +43,17 @@ private 
DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final
    *
    * @param srcIRVertex the {@link IRVertex} that outputs the data to be 
written.
    * @param srcTaskIdx  the index of the source task.
+   * @param cloneOffset the clone offset of the source task
    * @param dstIRVertex the {@link IRVertex} that will take the output data as 
its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to 
dstIRVertex.
    * @return the {@link OutputWriter} created.
    */
   public OutputWriter createWriter(final IRVertex srcIRVertex,
                                    final int srcTaskIdx,
+                                   final int cloneOffset,
                                    final IRVertex dstIRVertex,
                                    final RuntimeEdge<?> runtimeEdge) {
-    return new OutputWriter(hashRangeMultiplier, srcTaskIdx,
+    return new OutputWriter(hashRangeMultiplier, srcTaskIdx, cloneOffset,
         srcIRVertex.getId(), dstIRVertex, runtimeEdge, blockManagerWorker);
   }
 
@@ -59,13 +61,15 @@ public OutputWriter createWriter(final IRVertex srcIRVertex,
    * Creates an {@link InputReader} between two stages.
    *
    * @param dstTaskIdx  the index of the destination task.
+   * @param cloneOffset the clone offset.
    * @param srcIRVertex the {@link IRVertex} that output the data to be read.
    * @param runtimeEdge that connects the tasks belonging to srcIRVertex to 
dstTask.
    * @return the {@link InputReader} created.
    */
   public InputReader createReader(final int dstTaskIdx,
+                                  final int cloneOffset,
                                   final IRVertex srcIRVertex,
                                   final RuntimeEdge runtimeEdge) {
-    return new InputReader(dstTaskIdx, srcIRVertex, runtimeEdge, 
blockManagerWorker);
+    return new InputReader(dstTaskIdx, cloneOffset, srcIRVertex, runtimeEdge, 
blockManagerWorker);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 4471ae4d3..96449f1de 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -22,7 +22,7 @@
 import 
edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
@@ -47,6 +47,7 @@
 public final class InputReader extends DataTransfer {
   private static final Logger LOG = 
LoggerFactory.getLogger(InputReader.class.getName());
   private final int dstTaskIndex;
+  private final int cloneOffset;
   private final BlockManagerWorker blockManagerWorker;
 
   /**
@@ -56,11 +57,13 @@
   private final RuntimeEdge runtimeEdge;
 
   public InputReader(final int dstTaskIndex,
+                     final int cloneOffset,
                      final IRVertex srcVertex,
                      final RuntimeEdge runtimeEdge,
                      final BlockManagerWorker blockManagerWorker) {
     super(runtimeEdge.getId());
     this.dstTaskIndex = dstTaskIndex;
+    this.cloneOffset = cloneOffset;
     this.srcVertex = srcVertex;
     this.runtimeEdge = runtimeEdge;
     this.blockManagerWorker = blockManagerWorker;
@@ -150,10 +153,10 @@ private String getBlockId(final int taskIdx) {
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
         runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     if (!duplicateDataProperty.isPresent() || 
duplicateDataProperty.get().getGroupSize() <= 1) {
-      return RuntimeIdGenerator.generateBlockId(getId(), taskIdx);
+      return RuntimeIdManager.generateBlockId(getId(), taskIdx, cloneOffset);
     }
     final String duplicateEdgeId = 
duplicateDataProperty.get().getRepresentativeEdgeId();
-    return RuntimeIdGenerator.generateBlockId(duplicateEdgeId, taskIdx);
+    return RuntimeIdManager.generateBlockId(duplicateEdgeId, taskIdx, 
cloneOffset);
   }
 
   public IRVertex getSrcIrVertex() {
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 71d810a95..ebcf967b4 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.block.Block;
@@ -48,6 +48,7 @@
    *
    * @param hashRangeMultiplier the {@link 
edu.snu.nemo.conf.JobConf.HashRangeMultiplier}.
    * @param srcTaskIdx          the index of the source task.
+   * @param cloneOffset         the clone offset of the source task.
    * @param srcRuntimeVertexId  the ID of the source vertex.
    * @param dstIrVertex         the destination IR vertex.
    * @param runtimeEdge         the {@link RuntimeEdge}.
@@ -55,12 +56,13 @@
    */
   OutputWriter(final int hashRangeMultiplier,
                final int srcTaskIdx,
+               final int cloneOffset,
                final String srcRuntimeVertexId,
                final IRVertex dstIrVertex,
                final RuntimeEdge<?> runtimeEdge,
                final BlockManagerWorker blockManagerWorker) {
     super(runtimeEdge.getId());
-    this.blockId = RuntimeIdGenerator.generateBlockId(getId(), srcTaskIdx);
+    this.blockId = RuntimeIdManager.generateBlockId(getId(), srcTaskIdx, 
cloneOffset);
     this.runtimeEdge = runtimeEdge;
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 7d1901daa..eac2f54ab 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -23,7 +23,7 @@
 import 
edu.snu.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -133,7 +133,8 @@ public TaskExecutor(final Task task,
   private Pair<List<DataFetcher>, List<VertexHarness>> prepare(final Task task,
                                                                final 
DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                                                                final 
DataTransferFactory dataTransferFactory) {
-    final int taskIndex = 
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
+    final int taskIndex = 
RuntimeIdManager.getIndexFromTaskId(task.getTaskId());
+    final int cloneOffset = 
RuntimeIdManager.getCloneOffsetFromTaskId(task.getTaskId());
 
     // Traverse in a reverse-topological order to ensure that each visited 
vertex's children vertices exist.
     final List<IRVertex> reverseTopologicallySorted = 
Lists.reverse(irVertexDag.getTopologicalSort());
@@ -163,12 +164,11 @@ public TaskExecutor(final Task task,
 
       // Handle writes
       // Main output children task writes
-      final List<OutputWriter> mainChildrenTaskWriters = 
getMainChildrenTaskWriters(
-          taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
+      final List<OutputWriter> mainChildrenTaskWriters = 
getMainChildrenTaskWriters(irVertex,
+          taskIndex, cloneOffset, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
       // Additional output children task writes
-      final Map<String, OutputWriter> additionalChildrenTaskWriters = 
getAdditionalChildrenTaskWriters(
-          taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
-      // Find all main vertices and additional vertices
+      final Map<String, OutputWriter> additionalChildrenTaskWriters = 
getAdditionalChildrenTaskWriters(irVertex,
+          taskIndex, cloneOffset, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
       final List<String> additionalOutputVertices = new 
ArrayList<>(additionalOutputMap.values());
       final Set<String> mainChildren =
           getMainOutputVertices(irVertex, irVertexDag, 
task.getTaskOutgoingEdges(), additionalOutputVertices);
@@ -188,7 +188,7 @@ public TaskExecutor(final Task task,
             irVertex, sourceReader.get(), vertexHarness, isToSideInput)); // 
Source vertex read
       }
       final List<InputReader> parentTaskReaders =
-          getParentTaskReaders(taskIndex, irVertex, 
task.getTaskIncomingEdges(), dataTransferFactory);
+          getParentTaskReaders(irVertex, taskIndex, cloneOffset, 
task.getTaskIncomingEdges(), dataTransferFactory);
       parentTaskReaders.forEach(parentTaskReader -> {
         dataFetcherList.add(new 
ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
             vertexHarness, isToSideInput)); // Parent-task read
@@ -457,15 +457,16 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
     }
   }
 
-  private List<InputReader> getParentTaskReaders(final int taskIndex,
-                                                 final IRVertex irVertex,
+  private List<InputReader> getParentTaskReaders(final IRVertex irVertex,
+                                                 final int taskIndex,
+                                                 final int cloneOffset,
                                                  final List<StageEdge> 
inEdgesFromParentTasks,
                                                  final DataTransferFactory 
dataTransferFactory) {
     return inEdgesFromParentTasks
         .stream()
         .filter(inEdge -> 
inEdge.getDstIRVertex().getId().equals(irVertex.getId()))
         .map(inEdgeForThisVertex -> dataTransferFactory
-            .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), 
inEdgeForThisVertex))
+            .createReader(taskIndex, cloneOffset, 
inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
         .collect(Collectors.toList());
   }
 
@@ -494,15 +495,17 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
 
   /**
    * Return inter-task OutputWriters, for single output or output associated 
with main tag.
-   * @param taskIndex               current task index
    * @param irVertex                source irVertex
+   * @param taskIndex               current task index
+   * @param cloneOffset             current task's clone offset
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
    * @param taggedOutputs           tag to vertex id map
    * @return OutputWriters for main children tasks
    */
-  private List<OutputWriter> getMainChildrenTaskWriters(final int taskIndex,
-                                                        final IRVertex 
irVertex,
+  private List<OutputWriter> getMainChildrenTaskWriters(final IRVertex 
irVertex,
+                                                        final int taskIndex,
+                                                        final int cloneOffset,
                                                         final List<StageEdge> 
outEdgesToChildrenTasks,
                                                         final 
DataTransferFactory dataTransferFactory,
                                                         final Map<String, 
String> taggedOutputs) {
@@ -510,22 +513,24 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
         .stream()
         .filter(outEdge -> 
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
         .filter(outEdge -> 
!taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
-        .map(outEdgeForThisVertex -> dataTransferFactory
-            .createWriter(irVertex, taskIndex, 
outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
+        .map(outEdgeForThisVertex -> 
dataTransferFactory.createWriter(irVertex, taskIndex, cloneOffset,
+            outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
         .collect(Collectors.toList());
   }
 
   /**
    * Return inter-task OutputWriters associated with additional output tags.
-   * @param taskIndex               current task index
    * @param irVertex                source irVertex
+   * @param taskIndex               current task index
+   * @param cloneOffset             clone offset
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
    * @param taggedOutputs           tag to vertex id map
    * @return additional children vertex id to OutputWriters map.
    */
-  private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final int 
taskIndex,
-                                                                     final 
IRVertex irVertex,
+  private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final 
IRVertex irVertex,
+                                                                     final int 
taskIndex,
+                                                                     final int 
cloneOffset,
                                                                      final 
List<StageEdge> outEdgesToChildrenTasks,
                                                                      final 
DataTransferFactory dataTransferFactory,
                                                                      final 
Map<String, String> taggedOutputs) {
@@ -537,7 +542,7 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
         .filter(outEdge -> 
taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
         .forEach(outEdgeForThisVertex -> {
           
additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(),
-              dataTransferFactory.createWriter(irVertex, taskIndex, 
outEdgeForThisVertex.getDstIRVertex(),
+              dataTransferFactory.createWriter(irVertex, taskIndex, 
cloneOffset, outEdgeForThisVertex.getDstIRVertex(),
                   outEdgeForThisVertex));
         });
 
@@ -578,7 +583,7 @@ private void closeTransform(final VertexHarness 
vertexHarness) {
     vertexHarness.getContext().getSerializedData().ifPresent(data ->
         
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
             ControlMessage.Message.newBuilder()
-                .setId(RuntimeIdGenerator.generateMessageId())
+                .setId(RuntimeIdManager.generateMessageId())
                 
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
                 .setType(ControlMessage.MessageType.ExecutorDataCollected)
                 
.setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(data).build())
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 8f1b7e552..a6df0bfba 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -19,7 +19,7 @@
 import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -69,6 +69,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, 
SerializerManager.class})
 public final class BlockStoreTest {
+  private static final int CLONE_OFFSET = 0;
   private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
   private static final Serializer SERIALIZER = new Serializer(
       PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()),
@@ -121,10 +122,10 @@ public void setUp() throws Exception {
     IntStream.range(0, NUM_READ_VERTICES).forEach(number -> 
readTaskIdList.add("Read_IR_vertex"));
 
     // Generates the ids and the data of the blocks to be used.
-    final String shuffleEdge = 
RuntimeIdGenerator.generateStageEdgeId("shuffle_edge");
+    final String shuffleEdge = 
RuntimeIdManager.generateStageEdgeId("shuffle_edge");
     IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
-      final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, 
writeTaskIdx);
+      final String blockId = RuntimeIdManager.generateBlockId(shuffleEdge, 
writeTaskIdx, CLONE_OFFSET);
       blockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
@@ -143,10 +144,11 @@ public void setUp() throws Exception {
     // Following part is for the concurrent read test.
     final String writeTaskId = "conc_write_IR_vertex";
     final List<String> concReadTaskIdList = new 
ArrayList<>(NUM_CONC_READ_TASKS);
-    final String concEdge = 
RuntimeIdGenerator.generateStageEdgeId("conc_read_edge");
+    final String concEdge = 
RuntimeIdManager.generateStageEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
-    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, 
NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
+    concBlockId = RuntimeIdManager.generateBlockId(
+        concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1, CLONE_OFFSET);
     blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
         concBlockId, BlockState.State.IN_PROGRESS, null);
@@ -165,12 +167,12 @@ public void setUp() throws Exception {
     // Generates the ids of the tasks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> 
writeHashTaskIdList.add("hash_write_IR_vertex"));
     IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> 
readHashTaskIdList.add("hash_read_IR_vertex"));
-    final String hashEdge = 
RuntimeIdGenerator.generateStageEdgeId("hash_edge");
+    final String hashEdge = RuntimeIdManager.generateStageEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
-      final String blockId = RuntimeIdGenerator.generateBlockId(
-          hashEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx);
+      final String blockId = RuntimeIdManager.generateBlockId(
+          hashEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx, 
CLONE_OFFSET);
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c755543af..ca06497b5 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -30,7 +30,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageParameters;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -86,6 +86,8 @@
 @PrepareForTest({PubSubEventHandlerWrapper.class, 
UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
     SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
 public final class DataTransferTest {
+  private static final int CLONE_OFFSET = 0;
+
   private static final String EXECUTOR_ID_PREFIX = "Executor";
   private static final DataStoreProperty.Value MEMORY_STORE =
       DataStoreProperty.Value.MemoryStore;
@@ -305,15 +307,15 @@ private void writeAndRead(final BlockManagerWorker sender,
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final Stage srcStage = setupStages("srcStage-" + testIndex);
-    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    final Stage srcStage = setupStages("srcStage" + testIndex);
+    final Stage dstStage = setupStages("dstStage" + testIndex);
     dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, 
dstMockVertex,
         srcStage, dstStage, false);
 
     // Initialize states in Master
-    srcStage.getTaskIds().forEach(srcTaskId -> {
-      final String blockId = RuntimeIdGenerator.generateBlockId(
-          edgeId, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+    srcStage.getAllPossiblyClonedTaskIdsShuffled().forEach(srcTaskId -> {
+      final String blockId = RuntimeIdManager.generateBlockId(edgeId, 
RuntimeIdManager.getIndexFromTaskId(srcTaskId),
+          RuntimeIdManager.getCloneOffsetFromTaskId(srcTaskId));
       master.initializeState(blockId, srcTaskId);
       master.onProducerTaskScheduled(srcTaskId);
     });
@@ -322,7 +324,8 @@ private void writeAndRead(final BlockManagerWorker sender,
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge);
+      final OutputWriter writer =
+          transferFactory.createWriter(srcVertex, srcTaskIndex, CLONE_OFFSET, 
dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
@@ -332,7 +335,7 @@ private void writeAndRead(final BlockManagerWorker sender,
     final List<List> dataReadList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
       final InputReader reader =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+          new InputReader(dstTaskIndex, CLONE_OFFSET, srcVertex, dummyEdge, 
receiver);
 
       assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
@@ -390,21 +393,21 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
-    final Stage srcStage = setupStages("srcStage-" + testIndex);
-    final Stage dstStage = setupStages("dstStage-" + testIndex);
+    final Stage srcStage = setupStages("srcStage" + testIndex);
+    final Stage dstStage = setupStages("dstStage" + testIndex);
     dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, 
dstMockVertex,
         srcStage, dstStage, false);
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
-    final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
+    final Stage dstStage2 = setupStages("dstStage" + testIndex2);
     dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, 
dstMockVertex2,
         srcStage, dstStage, false);
     // Initialize states in Master
-    srcStage.getTaskIds().forEach(srcTaskId -> {
-      final String blockId = RuntimeIdGenerator.generateBlockId(
-          edgeId, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+    srcStage.getAllPossiblyClonedTaskIdsShuffled().forEach(srcTaskId -> {
+      final String blockId = RuntimeIdManager.generateBlockId(
+          edgeId, RuntimeIdManager.getIndexFromTaskId(srcTaskId), 
RuntimeIdManager.getCloneOffsetFromTaskId(srcTaskId));
       master.initializeState(blockId, srcTaskId);
-      final String blockId2 = RuntimeIdGenerator.generateBlockId(
-          edgeId2, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+      final String blockId2 = RuntimeIdManager.generateBlockId(edgeId2,
+          RuntimeIdManager.getIndexFromTaskId(srcTaskId), 
RuntimeIdManager.getCloneOffsetFromTaskId(srcTaskId));
       master.initializeState(blockId2, srcTaskId);
       master.onProducerTaskScheduled(srcTaskId);
     });
@@ -413,12 +416,14 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge);
+      final OutputWriter writer =
+          transferFactory.createWriter(srcVertex, srcTaskIndex, CLONE_OFFSET, 
dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
 
-      final OutputWriter writer2 = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge2);
+      final OutputWriter writer2 =
+          transferFactory.createWriter(srcVertex, srcTaskIndex, CLONE_OFFSET, 
dstVertex, dummyEdge2);
       dataWritten.iterator().forEachRemaining(writer2::write);
       writer2.close();
     });
@@ -428,9 +433,9 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
     final List<List> dataReadList2 = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
       final InputReader reader =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+          new InputReader(dstTaskIndex, CLONE_OFFSET, srcVertex, dummyEdge, 
receiver);
       final InputReader reader2 =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
+          new InputReader(dstTaskIndex, CLONE_OFFSET, srcVertex, dummyEdge2, 
receiver);
 
       assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
 
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index e810989bc..d36912488 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -30,7 +30,7 @@
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.Task;
@@ -70,6 +70,7 @@
 @PrepareForTest({InputReader.class, OutputWriter.class, 
DataTransferFactory.class,
     TaskStateManager.class, StageEdge.class, 
PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
 public final class TaskExecutorTest {
+  private static final int CLONE_OFFSET = 0;
   private static final int DATA_SIZE = 100;
   private static final ExecutionPropertyMap<VertexExecutionProperty> 
TASK_EXECUTION_PROPERTY_MAP
       = new ExecutionPropertyMap<>("TASK_EXECUTION_PROPERTY_MAP");
@@ -83,8 +84,8 @@
   private AtomicInteger stageId;
 
   private String generateTaskId() {
-    return RuntimeIdGenerator.generateTaskId(0,
-        RuntimeIdGenerator.generateStageId(stageId.getAndIncrement()));
+    return RuntimeIdManager.generateTaskId(
+        RuntimeIdManager.generateStageId(stageId.getAndIncrement()), 0, 
CLONE_OFFSET);
   }
 
   @Before
@@ -98,8 +99,8 @@ public void setUp() throws Exception {
     // Mock a DataTransferFactory.
     vertexIdToOutputData = new HashMap<>();
     dataTransferFactory = mock(DataTransferFactory.class);
-    when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new 
ParentTaskReaderAnswer());
-    when(dataTransferFactory.createWriter(any(), anyInt(), any(), 
any())).then(new ChildTaskWriterAnswer());
+    when(dataTransferFactory.createReader(anyInt(), anyInt(), any(), 
any())).then(new ParentTaskReaderAnswer());
+    when(dataTransferFactory.createWriter(any(), anyInt(), anyInt(), any(), 
any())).then(new ChildTaskWriterAnswer());
 
     // Mock a MetricMessageSender.
     metricMessageSender = mock(MetricMessageSender.class);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 546103972..7d8a08b56 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
@@ -43,7 +43,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.IntStream;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -82,29 +81,32 @@ private BlockManagerMaster(final MessageEnvironment 
masterMessageEnvironment) {
     this.lock = new ReentrantReadWriteLock();
   }
 
-  public void initialize(final PhysicalPlan physicalPlan) {
+  void initialize(final PhysicalPlan physicalPlan) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
     stageDAG.topologicalDo(stage -> {
-      final List<String> taskIdsForStage = stage.getTaskIds();
+      final List<String> srcTaskIdsOfStage = 
stage.getAllPossiblyClonedTaskIdsShuffled();
       final List<StageEdge> stageOutgoingEdges = 
stageDAG.getOutgoingEdgesOf(stage);
 
       // Initialize states for blocks of inter-stage edges
       stageOutgoingEdges.forEach(stageEdge -> {
-        final int srcParallelism = taskIdsForStage.size();
-        IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
-          final String blockId = 
RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
-          initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
+        srcTaskIdsOfStage.forEach(taskId -> {
+          final int srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
+          final int srcTaskCloneOffset = 
RuntimeIdManager.getCloneOffsetFromTaskId(taskId);
+          final String blockId = 
RuntimeIdManager.generateBlockId(stageEdge.getId(), srcTaskIndex, 
srcTaskCloneOffset);
+          initializeState(blockId, taskId);
         });
       });
 
       // Initialize states for blocks of stage internal edges
-      taskIdsForStage.forEach(taskId -> {
+      srcTaskIdsOfStage.forEach(taskId -> {
         final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = 
stage.getIRDAG();
         taskInternalDag.getVertices().forEach(task -> {
           final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = 
taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
-            final int srcTaskIdx = 
RuntimeIdGenerator.getIndexFromTaskId(taskId);
-            final String blockId = 
RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
+            final int srcTaskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
+            final int srcTaskCloneOffset = 
RuntimeIdManager.getCloneOffsetFromTaskId(taskId);
+            final String blockId =
+                RuntimeIdManager.generateBlockId(taskRuntimeEdge.getId(), 
srcTaskIdx, srcTaskCloneOffset);
             initializeState(blockId, taskId);
           });
         });
@@ -454,7 +456,7 @@ void registerRequest(final long requestId,
         }
         messageContext.reply(
             ControlMessage.Message.newBuilder()
-                .setId(RuntimeIdGenerator.generateMessageId())
+                .setId(RuntimeIdManager.generateMessageId())
                 .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
                 .setType(ControlMessage.MessageType.BlockLocationInfo)
                 .setBlockLocationInfoMsg(infoMsgBuilder.build())
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
index 4a266d548..00a32c42f 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
@@ -17,7 +17,7 @@
 
 import javax.inject.Inject;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
@@ -46,7 +46,7 @@ private MetricManagerMaster(final ExecutorRegistry 
executorRegistry) {
   public synchronized void sendMetricFlushRequest() {
     executorRegistry.viewExecutors(executors -> executors.forEach(executor -> {
       final ControlMessage.Message message = 
ControlMessage.Message.newBuilder()
-          .setId(RuntimeIdGenerator.generateMessageId())
+          .setId(RuntimeIdManager.generateMessageId())
           .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
           .setType(ControlMessage.MessageType.RequestMetricFlush)
           .build();
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index dd9f302c2..77c88ba3f 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -20,7 +20,7 @@
 import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.PlanState;
@@ -121,7 +121,7 @@ private void initializeComputationStates() {
     // Initialize the states for the plan down to task-level.
     physicalPlan.getStageDAG().topologicalDo(stage -> {
       idToStageStates.put(stage.getId(), new StageState());
-      stage.getTaskIds().forEach(taskId -> {
+      stage.getAllPossiblyClonedTaskIdsShuffled().forEach(taskId -> {
         idToTaskStates.put(taskId, new TaskState());
         taskIdToCurrentAttempt.put(taskId, 1);
       });
@@ -172,8 +172,9 @@ public synchronized void onTaskStateChanged(final String 
taskId, final TaskState
     }
 
     // Change stage state, if needed
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-    final List<String> tasksOfThisStage = 
physicalPlan.getStageDAG().getVertexById(stageId).getTaskIds();
+    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
+    final List<String> tasksOfThisStage =
+        
physicalPlan.getStageDAG().getVertexById(stageId).getAllPossiblyClonedTaskIdsShuffled();
     final long numOfCompletedOrOnHoldTasksInThisStage = tasksOfThisStage
         .stream()
         .map(this::getTaskState)
@@ -387,7 +388,7 @@ public synchronized String toString() {
       sb.append("\"tasks\": [");
 
       boolean isFirstTask = true;
-      for (final String taskId : stage.getTaskIds()) {
+      for (final String taskId : stage.getAllPossiblyClonedTaskIdsShuffled()) {
         if (!isFirstTask) {
           sb.append(", ");
         }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d0e21d8b9..c738b2af4 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -17,7 +17,7 @@
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
@@ -112,7 +112,7 @@ public void onTaskScheduled(final Task task) {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
           ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdGenerator.generateMessageId())
+              .setId(RuntimeIdManager.generateMessageId())
               .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.ScheduleTask)
               .setScheduleTaskMsg(
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
index 173a477a6..d0b3958bf 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.resource;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 
 /**
  * Represents the specifications of a resource.
@@ -38,7 +38,7 @@ public ResourceSpecification(final String containerType,
                                final int capacity,
                                final int memory,
                                final int poisonSec) {
-    this.resourceSpecId = RuntimeIdGenerator.generateResourceSpecId();
+    this.resourceSpecId = RuntimeIdManager.generateResourceSpecId();
     this.containerType = containerType;
     this.capacity = capacity;
     this.memory = memory;
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 24c0e435f..8fdf2b743 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.BlockState;
@@ -41,7 +41,6 @@
 import javax.inject.Inject;
 import java.util.*;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 
@@ -176,7 +175,7 @@ public void onTaskStateReportFromExecutor(final String 
executorId,
         case COMPLETE:
         case ON_HOLD:
           // If the stage has completed
-          final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+          final String stageIdForTaskUponCompletion = 
RuntimeIdManager.getStageIdFromTaskId(taskId);
           if 
(planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
 {
             if (!planStateManager.isPlanDone()) {
               doSchedule();
@@ -271,7 +270,7 @@ private void doSchedule() {
 
       LOG.info("Scheduling some tasks in {}, which are in the same 
ScheduleGroup", tasksToSchedule.stream()
           .map(Task::getTaskId)
-          .map(RuntimeIdGenerator::getStageIdFromTaskId)
+          .map(RuntimeIdManager::getStageIdFromTaskId)
           .collect(Collectors.toSet()));
 
       // Set the pointer to the schedulable tasks.
@@ -304,7 +303,7 @@ private void doSchedule() {
         physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
     final List<String> taskIdsToSchedule = new LinkedList<>();
-    for (final String taskId : stageToSchedule.getTaskIds()) {
+    for (final String taskId : 
stageToSchedule.getAllPossiblyClonedTaskIdsShuffled()) {
       final TaskState.State taskState = planStateManager.getTaskState(taskId);
 
       switch (taskState) {
@@ -332,7 +331,7 @@ private void doSchedule() {
     final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId); // Notify the block 
manager early for push edges.
-      final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
+      final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
       final int attemptIdx = planStateManager.getTaskAttempt(taskId);
       tasks.add(new Task(
           physicalPlan.getId(),
@@ -378,7 +377,7 @@ private void onTaskExecutionOnHold(final String executorId,
       executor.onTaskExecutionComplete(taskId);
       return Pair.of(executor, state);
     });
-    final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+    final String stageIdForTaskUponCompletion = 
RuntimeIdManager.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
         
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
@@ -458,19 +457,20 @@ private void retryTasksAndRequiredParents(final 
Set<String> tasks) {
   }
 
   private Set<String> getParentTasks(final String childTaskId) {
-    final String stageIdOfChildTask = 
RuntimeIdGenerator.getStageIdFromTaskId(childTaskId);
+    final String stageIdOfChildTask = 
RuntimeIdManager.getStageIdFromTaskId(childTaskId);
     return physicalPlan.getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
         .stream()
         .flatMap(inStageEdge -> {
-          final List<String> tasksOfParentStage = 
inStageEdge.getSrc().getTaskIds();
+          final List<String> tasksOfParentStage = 
inStageEdge.getSrc().getAllPossiblyClonedTaskIdsShuffled();
           switch (inStageEdge.getDataCommunicationPattern()) {
             case Shuffle:
             case BroadCast:
               // All of the parent stage's tasks are parents
               return tasksOfParentStage.stream();
             case OneToOne:
-              // Only one of the parent stage's tasks is a parent
-              return 
Stream.of(tasksOfParentStage.get(RuntimeIdGenerator.getIndexFromTaskId(childTaskId)));
+              // Only one of the parent stage's tasks (and possibly its 
clones) is a parent
+              return tasksOfParentStage.stream().filter(parentTask ->
+                  RuntimeIdManager.getIndexFromTaskId(parentTask) == 
RuntimeIdManager.getIndexFromTaskId(childTaskId));
             default:
               throw new IllegalStateException(inStageEdge.toString());
           }
@@ -484,7 +484,7 @@ private void retryTasksAndRequiredParents(final Set<String> 
tasks) {
    */
   private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String 
taskId) {
     for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
-      if 
(stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+      if (stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId))) 
{
         return stage.getIRDAG();
       }
     }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index ff3986e50..cdc75eb0a 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSiteProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
@@ -57,7 +57,7 @@ public boolean testSchedulability(final ExecutorRepresenter 
executor, final Task
     }
     try {
       return executor.getNodeName().equals(
-          getNodeName(propertyValue, 
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+          getNodeName(propertyValue, 
RuntimeIdManager.getIndexFromTaskId(task.getTaskId())));
     } catch (final IllegalStateException e) {
       throw new RuntimeException(String.format("Cannot schedule %s", 
task.getTaskId(), e));
     }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
index 236453fc0..b4d26a833 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
@@ -18,7 +18,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
@@ -43,7 +43,7 @@ public SkewnessAwareSchedulingConstraint() {
   }
 
   public boolean hasSkewedData(final Task task) {
-    final int taskIdx = 
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
+    final int taskIdx = RuntimeIdManager.getIndexFromTaskId(task.getTaskId());
     for (StageEdge inEdge : task.getTaskIncomingEdges()) {
       final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx);
       if (((HashRange) hashRange).isSkewed()) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index d355aa73d..3ffb8c6d4 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -19,7 +19,7 @@
 import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -60,8 +60,9 @@ private SourceLocationAwareSchedulingConstraint(final 
BlockManagerMaster blockMa
           
physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
               .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
         final String blockIdToRead =
-            RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(),
-                RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()));
+            RuntimeIdManager.generateBlockId(physicalStageEdge.getId(),
+                RuntimeIdManager.getIndexFromTaskId(task.getTaskId()),
+                RuntimeIdManager.getCloneOffsetFromTaskId(task.getTaskId()));
         final BlockManagerMaster.BlockLocationRequestHandler locationHandler =
             blockManagerMaster.getBlockLocationHandler(blockIdToRead);
         if (locationHandler.getLocationFuture().isDone()) { // if the location 
is known.
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index e5b362ed4..623c04391 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -15,14 +15,13 @@
  */
 package edu.snu.nemo.runtime.master;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
 import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +35,7 @@
  * Test for {@link BlockManagerMaster}.
  */
 public final class BlockManagerMasterTest {
+  private static int CLONE_OFFSET = 0;
   private BlockManagerMaster blockManagerMaster;
 
   @Before
@@ -78,11 +78,11 @@ private static void checkPendingFuture(final Future<String> 
future) {
    */
   @Test
   public void testLostAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-0");
+    final String edgeId = RuntimeIdManager.generateStageEdgeId("Edge0");
     final int srcTaskIndex = 0;
-    final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, 
"Stage-test");
-    final String executorId = RuntimeIdGenerator.generateExecutorId();
-    final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, 
srcTaskIndex);
+    final String taskId = RuntimeIdManager.generateTaskId("Stagetest", 
srcTaskIndex, CLONE_OFFSET);
+    final String executorId = RuntimeIdManager.generateExecutorId();
+    final String blockId = RuntimeIdManager.generateBlockId(edgeId, 
srcTaskIndex, CLONE_OFFSET);
 
     // Initially the block state is NOT_AVAILABLE.
     blockManagerMaster.initializeState(blockId, taskId);
@@ -111,11 +111,11 @@ public void testLostAfterCommit() throws Exception {
    */
   @Test
   public void testBeforeAfterCommit() throws Exception {
-    final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-1");
+    final String edgeId = RuntimeIdManager.generateStageEdgeId("Edge1");
     final int srcTaskIndex = 0;
-    final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, 
"Stage-Test");
-    final String executorId = RuntimeIdGenerator.generateExecutorId();
-    final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, 
srcTaskIndex);
+    final String taskId = RuntimeIdManager.generateTaskId("StageTest", 
srcTaskIndex, CLONE_OFFSET);
+    final String executorId = RuntimeIdManager.generateExecutorId();
+    final String blockId = RuntimeIdManager.generateBlockId(edgeId, 
srcTaskIndex, CLONE_OFFSET);
 
     // The block is being scheduled.
     blockManagerMaster.initializeState(blockId, taskId);
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
index 20031538f..061b8ab3e 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master;
 
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
@@ -75,14 +75,12 @@ public void testPhysicalPlanStateChanges() throws Exception 
{
 
     for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
       final Stage stage = stageList.get(stageIdx);
-      final List<String> taskIds = stage.getTaskIds();
+      final List<String> taskIds = stage.getAllPossiblyClonedTaskIdsShuffled();
       taskIds.forEach(taskId -> {
         planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
         planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
-        if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 
1) {
-          assertEquals(StageState.State.COMPLETE, 
planStateManager.getStageState(stage.getId()));
-        }
       });
+      assertEquals(StageState.State.COMPLETE, 
planStateManager.getStageState(stage.getId()));
       taskIds.forEach(taskId -> 
assertEquals(planStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
 
       if (stageIdx == stageList.size() - 1) {
@@ -111,7 +109,7 @@ public void testWaitUntilFinish() throws Exception {
     // Complete the plan and check the result again.
     // It has to return COMPLETE.
     final List<String> tasks = 
physicalPlan.getStageDAG().getTopologicalSort().stream()
-        .flatMap(stage -> stage.getTaskIds().stream())
+        .flatMap(stage -> stage.getAllPossiblyClonedTaskIdsShuffled().stream())
         .collect(Collectors.toList());
     tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
     tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index d31965bd2..bb006fd3f 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -46,7 +46,7 @@ static void completeStage(final PlanStateManager 
planStateManager,
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.INCOMPLETE == stageState) {
-        stage.getTaskIds().forEach(taskId -> {
+        stage.getAllPossiblyClonedTaskIdsShuffled().forEach(taskId -> {
           final TaskState.State taskState = 
planStateManager.getTaskState(taskId);
           if (TaskState.State.EXECUTING == taskState) {
             sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
index 87c933f4b..ed6e05ccf 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
@@ -38,6 +38,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, 
StageEdge.class})
 public final class SkewnessAwareSchedulingConstraintTest {
+  private static final int CLONE_OFFSET = 0;
 
   private static StageEdge mockStageEdge() {
     final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
@@ -61,7 +62,7 @@ private static StageEdge mockStageEdge() {
 
   private static Task mockTask(final int taskIdx, final List<StageEdge> 
inEdges) {
     final Task task = mock(Task.class);
-    
when(task.getTaskId()).thenReturn(RuntimeIdGenerator.generateTaskId(taskIdx, 
"Stage-0"));
+    
when(task.getTaskId()).thenReturn(RuntimeIdManager.generateTaskId("Stage0", 
taskIdx, CLONE_OFFSET));
     when(task.getTaskIncomingEdges()).thenReturn(inEdges);
     return task;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to