sanha closed pull request #5: [NEMO-27] Element Wise Block Write
URL: https://github.com/apache/incubator-nemo/pull/5
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java 
b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
index 22c1127a..8ad8d4d7 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
@@ -21,9 +21,9 @@
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Test Utils for Examples.
@@ -42,11 +42,12 @@ private ExampleTestUtil() {
    * @param resourcePath root folder for both resources.
    * @param outputFileName output file name.
    * @param testResourceFileName the test result file name.
+   * @return error message if the output is invalid.
    * @throws IOException IOException while testing.
    */
-  public static void ensureOutputValidity(final String resourcePath,
-                                          final String outputFileName,
-                                          final String testResourceFileName)
+  public static Optional<String> ensureOutputValidity(final String 
resourcePath,
+                                                      final String 
outputFileName,
+                                                      final String 
testResourceFileName)
   throws IOException {
     final String testOutput = Files.list(Paths.get(resourcePath))
         .filter(Files::isRegularFile)
@@ -74,8 +75,9 @@ public static void ensureOutputValidity(final String 
resourcePath,
               + "\n=============" + testResourceFileName + "=================="
               + resourceOutput
               + "\n===============================";
-      throw new RuntimeException(outputMsg);
+      return Optional.of(outputMsg);
     }
+    return Optional.empty();
   }
 
   /**
@@ -86,11 +88,12 @@ public static void ensureOutputValidity(final String 
resourcePath,
    * @param resourcePath path to resources.
    * @param outputFileName name of output file.
    * @param testResourceFileName name of the file to compare the outputs to.
+   * @return error message if the output is invalid.
    * @throws IOException exception.
    */
-  public static void ensureALSOutputValidity(final String resourcePath,
-                                             final String outputFileName,
-                                             final String 
testResourceFileName) throws IOException {
+  public static Optional<String> ensureALSOutputValidity(final String 
resourcePath,
+                                                         final String 
outputFileName,
+                                                         final String 
testResourceFileName) throws IOException {
     final List<List<Double>> testOutput = Files.list(Paths.get(resourcePath))
         .filter(Files::isRegularFile)
         .filter(path -> 
path.getFileName().toString().startsWith(outputFileName))
@@ -115,18 +118,19 @@ public static void ensureALSOutputValidity(final String 
resourcePath,
         .collect(Collectors.toList());
 
     if (testOutput.size() != resourceOutput.size()) {
-      throw new RuntimeException("output mismatch");
+      return Optional.of("output mismatch");
     }
 
-    IntStream.range(0, testOutput.size()).forEach(i -> {
-          IntStream.range(0, testOutput.get(i).size()).forEach(j -> {
-            final Double testElement = testOutput.get(i).get(j);
-            final Double resourceElement = resourceOutput.get(i).get(j);
-            if (Math.abs(testElement - resourceElement) / resourceElement > 
ERROR) {
-              throw new RuntimeException("output mismatch");
-            }
-          });
-        });
+    for (int i = 0; i < testOutput.size(); i++) {
+      for (int j = 0; j < testOutput.get(i).size(); j++) {
+        final Double testElement = testOutput.get(i).get(j);
+        final Double resourceElement = resourceOutput.get(i).get(j);
+        if (Math.abs(testElement - resourceElement) / resourceElement > ERROR) 
{
+          return Optional.of("output mismatch");
+        }
+      }
+    }
+    return Optional.empty();
   }
 
   /**
@@ -146,6 +150,4 @@ public static void deleteOutputFile(final String directory,
       Files.delete(outputFilePath);
     }
   }
-
-
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 51ca6195..918dec4f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -54,7 +55,8 @@ public DataSkewReshapingPass() {
       if (v instanceof OperatorVertex && 
dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
           DataCommunicationPatternProperty.Value.Shuffle
           
.equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))) {
-        final MetricCollectionBarrierVertex metricCollectionBarrierVertex = 
new MetricCollectionBarrierVertex();
+        final MetricCollectionBarrierVertex<Pair<Integer, Long>> 
metricCollectionBarrierVertex
+            = new MetricCollectionBarrierVertex<>();
         metricCollectionVertices.add(metricCollectionBarrierVertex);
         builder.addVertex(v);
         builder.addVertex(metricCollectionBarrierVertex);
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index 7eac5f55..3f2cb0ee 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -27,6 +27,8 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Optional;
+
 /**
  * Test Alternating Least Square program with JobLauncher.
  */
@@ -52,8 +54,12 @@ public void setUp() throws Exception {
 
   @After
   public void tearDown() throws Exception {
-    ExampleTestUtil.ensureALSOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
+    final Optional<String> errorMsg =
+        ExampleTestUtil.ensureALSOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
     ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    if (errorMsg.isPresent()) {
+      throw new RuntimeException(errorMsg.get());
+    }
   }
 
   @Test (timeout = TIMEOUT)
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
index ec1dbc59..f6314e42 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
@@ -27,6 +27,8 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Optional;
+
 /**
  * Test Broadcast program with JobLauncher.
  */
@@ -50,8 +52,12 @@ public void setUp() throws Exception {
 
   @After
   public void tearDown() throws Exception {
-    ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
+    final Optional<String> errorMsg =
+        ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
     ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    if (errorMsg.isPresent()) {
+      throw new RuntimeException(errorMsg.get());
+    }
   }
 
   @Test (timeout = TIMEOUT)
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 9ff51004..83ef755e 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -26,6 +26,8 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Optional;
+
 /**
  * Test MapReduce program with JobLauncher.
  */
@@ -50,8 +52,12 @@ public void setUp() throws Exception {
 
   @After
   public void tearDown() throws Exception {
-    ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
+    final Optional<String> errorMsg =
+        ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFileName);
     ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    if (errorMsg.isPresent()) {
+      throw new RuntimeException(errorMsg.get());
+    }
   }
 
   @Test (timeout = TIMEOUT)
diff --git 
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java 
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
index bb050dcd..75352738 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
@@ -28,6 +28,8 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Optional;
+
 /**
  * Test Spark programs with JobLauncher.
  */
@@ -59,8 +61,12 @@ public void testSparkWordCount() throws Exception {
         .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
         .build());
 
-    ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFilename);
+    final Optional<String> errorMsg =
+        ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
testResourceFilename);
     ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    if (errorMsg.isPresent()) {
+      throw new RuntimeException(errorMsg.get());
+    }
   }
 
   @Test(timeout = TIMEOUT)
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index ffcf784d..8e002806 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.common.optimizer;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
@@ -45,12 +46,13 @@ public static synchronized PhysicalPlan dynamicOptimization(
     final DynamicOptimizationProperty.Value dynamicOptimizationType =
         
metricCollectionBarrierVertex.getProperty(ExecutionProperty.Key.DynamicOptimizationType);
 
-    if 
(dynamicOptimizationType.equals(DynamicOptimizationProperty.Value.DataSkewRuntimePass))
 {
+    switch (dynamicOptimizationType) {
+      case DataSkewRuntimePass:
         // Map between a partition ID to corresponding metric data (e.g., the 
size of each block).
-        final Map<String, List<Long>> metricData = 
metricCollectionBarrierVertex.getMetricData();
+        final Map<String, List<Pair<Integer, Long>>> metricData = 
metricCollectionBarrierVertex.getMetricData();
         return new DataSkewRuntimePass().apply(originalPlan, metricData);
-    } else {
-      return originalPlan;
+      default:
+        throw new UnsupportedOperationException("Unknown runtime pass: " + 
dynamicOptimizationType);
     }
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 5a15be46..ba5713ad 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
@@ -38,7 +39,7 @@
 /**
  * Dynamic optimization pass for handling data skew.
  */
-public final class DataSkewRuntimePass implements RuntimePass<Map<String, 
List<Long>>> {
+public final class DataSkewRuntimePass implements RuntimePass<Map<String, 
List<Pair<Integer, Long>>>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
 
@@ -56,7 +57,7 @@ public DataSkewRuntimePass() {
   }
 
   @Override
-  public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, 
List<Long>> metricData) {
+  public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, 
List<Pair<Integer, Long>>> metricData) {
     // Builder to create new stages.
     final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder =
         new DAGBuilder<>(originalPlan.getStageDAG());
@@ -90,26 +91,40 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan, 
final Map<String, Lis
 
   /**
    * Method for calculating key ranges to evenly distribute the skewed metric 
data.
-   * @param metricData the metric data.
+   *
+   * @param metricData        the metric data.
    * @param taskGroupListSize the size of the task group list.
-   * @return  the list of key ranges calculated.
+   * @return the list of key ranges calculated.
    */
   @VisibleForTesting
-  public List<KeyRange> calculateHashRanges(final Map<String, List<Long>> 
metricData,
+  public List<KeyRange> calculateHashRanges(final Map<String, 
List<Pair<Integer, Long>>> metricData,
                                             final Integer taskGroupListSize) {
     // NOTE: metricData is made up of a map of blockId to blockSizes.
     // Count the hash range (number of blocks for each block).
-    final int hashRangeCount = 
metricData.values().stream().findFirst().orElseThrow(() ->
-        new DynamicOptimizationException("no valid metric data.")).size();
+    final int maxHashValue = metricData.values().stream()
+        .map(list -> list.stream()
+            .map(pair -> pair.left())
+            .max(Integer::compareTo)
+            .orElseThrow(() -> new DynamicOptimizationException("Cannot find 
max hash value in a block.")))
+        .max(Integer::compareTo)
+        .orElseThrow(() -> new DynamicOptimizationException("Cannot find max 
hash value among blocks."));
 
     // Aggregate metric data.
-    final List<Long> aggregatedMetricData = new ArrayList<>(hashRangeCount);
+    final Map<Integer, Long> aggregatedMetricData = new 
HashMap<>(maxHashValue);
     // for each hash range index, we aggregate the metric data.
-    IntStream.range(0, hashRangeCount).forEach(i ->
-        aggregatedMetricData.add(i, metricData.values().stream().mapToLong(lst 
-> lst.get(i)).sum()));
+    metricData.forEach((blockId, pairs) -> {
+      pairs.forEach(pair -> {
+        final int key = pair.left();
+        if (aggregatedMetricData.containsKey(key)) {
+          aggregatedMetricData.compute(key, (existKey, existValue) -> 
existValue + pair.right());
+        } else {
+          aggregatedMetricData.put(key, pair.right());
+        }
+      });
+    });
 
     // Do the optimization using the information derived above.
-    final Long totalSize = aggregatedMetricData.stream().mapToLong(n -> 
n).sum(); // get total size
+    final Long totalSize = aggregatedMetricData.values().stream().mapToLong(n 
-> n).sum(); // get total size
     final Long idealSizePerTaskGroup = totalSize / taskGroupListSize; // and 
derive the ideal size per task group
     LOG.info("idealSizePerTaskgroup {} = {}(totalSize) / 
{}(taskGroupListSize)",
         idealSizePerTaskGroup, totalSize, taskGroupListSize);
@@ -118,30 +133,31 @@ public PhysicalPlan apply(final PhysicalPlan 
originalPlan, final Map<String, Lis
     final List<KeyRange> keyRanges = new ArrayList<>(taskGroupListSize);
     int startingHashValue = 0;
     int finishingHashValue = 1; // initial values
-    Long currentAccumulatedSize = aggregatedMetricData.get(startingHashValue);
+    Long currentAccumulatedSize = 
aggregatedMetricData.getOrDefault(startingHashValue, 0L);
     for (int i = 1; i <= taskGroupListSize; i++) {
       if (i != taskGroupListSize) {
         final Long idealAccumulatedSize = idealSizePerTaskGroup * i; // where 
we should end
         // find the point while adding up one by one.
         while (currentAccumulatedSize < idealAccumulatedSize) {
-          currentAccumulatedSize += 
aggregatedMetricData.get(finishingHashValue);
+          currentAccumulatedSize += 
aggregatedMetricData.getOrDefault(finishingHashValue, 0L);
           finishingHashValue++;
         }
 
-        Long oneStepBack = currentAccumulatedSize - 
aggregatedMetricData.get(finishingHashValue - 1);
-        Long diffFromIdeal = currentAccumulatedSize - idealAccumulatedSize;
-        Long diffFromIdealOneStepBack = idealAccumulatedSize - oneStepBack;
+        final Long oneStepBack =
+            currentAccumulatedSize - 
aggregatedMetricData.getOrDefault(finishingHashValue - 1, 0L);
+        final Long diffFromIdeal = currentAccumulatedSize - 
idealAccumulatedSize;
+        final Long diffFromIdealOneStepBack = idealAccumulatedSize - 
oneStepBack;
         // Go one step back if we came too far.
         if (diffFromIdeal > diffFromIdealOneStepBack) {
           finishingHashValue--;
-          currentAccumulatedSize -= 
aggregatedMetricData.get(finishingHashValue);
+          currentAccumulatedSize -= 
aggregatedMetricData.getOrDefault(finishingHashValue, 0L);
         }
 
         // assign appropriately
         keyRanges.add(i - 1, HashRange.of(startingHashValue, 
finishingHashValue));
         startingHashValue = finishingHashValue;
       } else { // last one: we put the range of the rest.
-        keyRanges.add(i - 1, HashRange.of(startingHashValue, hashRangeCount));
+        keyRanges.add(i - 1, HashRange.of(startingHashValue, maxHashValue + 
1));
       }
     }
     return keyRanges;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto 
b/runtime/common/src/main/proto/ControlMessage.proto
index 7fa24e55..f3ae7b88 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -82,11 +82,16 @@ message BlockStateChangedMsg {
 
 message DataSizeMetricMsg {
     // TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
-    repeated int64 partitionSizeInfo = 1;
+    repeated PartitionSizeEntry partitionSize = 1;
     required string blockId = 2;
     required string srcIRVertexId = 3;
 }
 
+message PartitionSizeEntry {
+    required int32 key = 1;
+    required int64 size = 2;
+}
+
 message RequestBlockLocationMsg {
     required string executorId = 1;
     required string blockId = 2;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index ce67b01a..ebe9d4f0 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.executor.bytetransfer;
 
 import edu.snu.nemo.runtime.executor.data.FileArea;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d108a194..201d7fbd 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -33,6 +33,8 @@
 import edu.snu.nemo.runtime.executor.bytetransfer.ByteInputContext;
 import edu.snu.nemo.runtime.executor.bytetransfer.ByteOutputContext;
 import edu.snu.nemo.runtime.executor.bytetransfer.ByteTransfer;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.stores.BlockStore;
 import edu.snu.nemo.runtime.executor.data.stores.*;
 import org.apache.commons.lang3.SerializationUtils;
@@ -41,6 +43,7 @@
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +125,7 @@ public void createBlock(final String blockId,
 
     // First, try to fetch the block from local BlockStore.
     final Optional<Iterable<NonSerializedPartition>> optionalResultPartitions =
-        store.getPartitions(blockId, keyRange);
+        store.readPartitions(blockId, keyRange);
 
     if (optionalResultPartitions.isPresent()) {
       handleUsedData(blockStore, blockId);
@@ -219,25 +222,26 @@ public void createBlock(final String blockId,
     });
   }
 
+
   /**
-   * Store an iterable of data partitions to a block in the target {@code 
BlockStore}.
-   * Invariant: This should not be invoked after a block is committed.
+   * Writes an element to a block in the target {@code BlockStore}.
+   * Invariant: This should not be invoked after the block is committed.
    * Invariant: This method may not support concurrent write for a single 
block.
-   * Only one thread have to write at once.
+   *            Only one thread have to write at once.
    *
-   * @param blockId            of the block.
-   * @param partitions         to save to a block.
-   * @param blockStore         to store the block.
-   * @return a {@link Optional} of the size of each written block.
+   * @param blockId    the ID of the block to write.
+   * @param key        the key of the partition in the block to write the 
element.
+   * @param element    the element to write.
+   * @param blockStore the store contains the target block.
+   * @param <K>        the key type of the block to write.
    */
-  public Optional<List<Long>> putPartitions(final String blockId,
-                                            final Iterable<Partition> 
partitions,
-                                            final DataStoreProperty.Value 
blockStore) {
-    LOG.info("PutPartitions: {}", blockId);
+  public <K extends Serializable> void write(final String blockId,
+                                             final K key,
+                                             final Object element,
+                                             final DataStoreProperty.Value 
blockStore) {
     final BlockStore store = getBlockStore(blockStore);
-
     try {
-      return store.putPartitions(blockId, (Iterable) partitions);
+      store.write(blockId, key, element);
     } catch (final Exception e) {
       throw new BlockWriteException(e);
     }
@@ -246,19 +250,20 @@ public void createBlock(final String blockId,
   /**
    * Notifies that all writes for a block is end.
    *
-   * @param blockId           the ID of the block.
-   * @param blockStore        the store to save the block.
-   * @param partitionSizeInfo the size metric of partitions.
-   * @param srcIRVertexId     the IR vertex ID of the source task.
-   * @param expectedReadTotal the expected number of read for this block.
-   * @param usedDataHandling  how to handle the used block.
+   * @param blockId              the ID of the block.
+   * @param blockStore           the store to save the block.
+   * @param reportPartitionSizes whether report the size of partitions to 
master or not.
+   * @param srcIRVertexId        the IR vertex ID of the source task.
+   * @param expectedReadTotal    the expected number of read for this block.
+   * @param usedDataHandling     how to handle the used block.
+   * @return a {@link Optional} of the size of each written block.
    */
-  public void commitBlock(final String blockId,
-                          final DataStoreProperty.Value blockStore,
-                          final List<Long> partitionSizeInfo,
-                          final String srcIRVertexId,
-                          final int expectedReadTotal,
-                          final UsedDataHandlingProperty.Value 
usedDataHandling) {
+  public Optional<Long> commitBlock(final String blockId,
+                                    final DataStoreProperty.Value blockStore,
+                                    final boolean reportPartitionSizes,
+                                    final String srcIRVertexId,
+                                    final int expectedReadTotal,
+                                    final UsedDataHandlingProperty.Value 
usedDataHandling) {
     LOG.info("CommitBlock: {}", blockId);
     switch (usedDataHandling) {
       case Discard:
@@ -272,7 +277,7 @@ public void commitBlock(final String blockId,
     }
 
     final BlockStore store = getBlockStore(blockStore);
-    store.commitBlock(blockId);
+    final Optional<Map<Integer, Long>> partitionSizeMap = 
store.commitBlock(blockId);
     final ControlMessage.BlockStateChangedMsg.Builder 
blockStateChangedMsgBuilder =
         ControlMessage.BlockStateChangedMsg.newBuilder()
             .setExecutorId(executorId)
@@ -293,8 +298,17 @@ public void commitBlock(final String blockId,
             .setBlockStateChangedMsg(blockStateChangedMsgBuilder.build())
             .build());
 
-    if (!partitionSizeInfo.isEmpty()) {
-      // TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
+    if (reportPartitionSizes && partitionSizeMap.isPresent()) {
+      final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new 
ArrayList<>();
+      partitionSizeMap.get().forEach((key, size) ->
+          partitionSizeEntries.add(
+              ControlMessage.PartitionSizeEntry.newBuilder()
+                  .setKey(key)
+                  .setSize(size)
+                  .build())
+      );
+
+      // TODO #4: Refactor metric aggregation for (general) run-rime 
optimization.
       
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
           .send(ControlMessage.Message.newBuilder()
               .setId(RuntimeIdGenerator.generateMessageId())
@@ -303,10 +317,21 @@ public void commitBlock(final String blockId,
               
.setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
                   .setBlockId(blockId)
                   .setSrcIRVertexId(srcIRVertexId)
-                  .addAllPartitionSizeInfo(partitionSizeInfo)
+                  .addAllPartitionSize(partitionSizeEntries)
               )
               .build());
     }
+
+    // Return the total size of the committed block.
+    if (partitionSizeMap.isPresent()) {
+      long blockSizeTotal = 0;
+      for (final long partitionSize : partitionSizeMap.get().values()) {
+        blockSizeTotal += partitionSize;
+      }
+      return Optional.of(blockSizeTotal);
+    } else {
+      return Optional.empty();
+    }
   }
 
   /**
@@ -319,8 +344,7 @@ public void removeBlock(final String blockId,
                           final DataStoreProperty.Value blockStore) {
     LOG.info("RemoveBlock: {}", blockId);
     final BlockStore store = getBlockStore(blockStore);
-    final boolean exist;
-    exist = store.removeBlock(blockId);
+    final boolean exist = store.removeBlock(blockId);
 
     if (exist) {
       final ControlMessage.BlockStateChangedMsg.Builder 
blockStateChangedMsgBuilder =
@@ -370,6 +394,11 @@ public void run() {
     } // If null, just keep the data in the store.
   }
 
+  /**
+   * Gets the {@link BlockStore} from annotated value of {@link 
DataStoreProperty}.
+   * @param blockStore the annotated value of {@link DataStoreProperty}.
+   * @return the block store.
+   */
   private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
     switch (blockStore) {
       case MemoryStore:
@@ -413,7 +442,7 @@ public void run() {
             }
           } else {
             final Optional<Iterable<SerializedPartition>> optionalResult = 
getBlockStore(blockStore)
-                .getSerializedPartitions(blockId, keyRange);
+                .readSerializedPartitions(blockId, keyRange);
             for (final SerializedPartition partition : optionalResult.get()) {
               
outputContext.newOutputStream().writeSerializedPartition(partition).close();
             }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 68664a77..861451a5 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -18,6 +18,8 @@
 import com.google.common.io.CountingInputStream;
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
 import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
@@ -83,7 +85,8 @@ public static long serializePartition(final Coder coder,
   }
 
   /**
-   * Converts the non-serialized {@link Partition}s in an iterable to 
serialized {@link Partition}s.
+   * Converts the non-serialized {@link 
edu.snu.nemo.runtime.executor.data.partition.Partition}s
+   * in an iterable to serialized partitions.
    *
    * @param serializer          the serializer for serialization.
    * @param partitionsToConvert the partitions to convert.
@@ -114,7 +117,8 @@ public static long serializePartition(final Coder coder,
   }
 
   /**
-   * Converts the serialized {@link Partition}s in an iterable to 
non-serialized {@link Partition}s.
+   * Converts the serialized {@link 
edu.snu.nemo.runtime.executor.data.partition.Partition}s
+   * in an iterable to non-serialized partitions.
    *
    * @param serializer          the serializer for deserialization.
    * @param partitionsToConvert the partitions to convert.
@@ -131,7 +135,7 @@ public static long serializePartition(final Coder coder,
       try (final ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = 
deserializePartition(
-            partitionToConvert.getElementsTotal(), serializer, key, 
byteArrayInputStream);
+            partitionToConvert.getElementsCount(), serializer, key, 
byteArrayInputStream);
         nonSerializedPartitions.add(deserializePartition);
       }
     }
@@ -163,7 +167,8 @@ public static String blockIdToMetaFilePath(final String 
blockId,
   }
 
   /**
-   * Concatenates an iterable of non-serialized {@link Partition}s into a 
single iterable of elements.
+   * Concatenates an iterable of non-serialized {@link 
edu.snu.nemo.runtime.executor.data.partition.Partition}s
+   * into a single iterable of elements.
    *
    * @param partitionsToConcat the partitions to concatenate.
    * @return the concatenated iterable of all elements.
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializedPartition.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializedPartition.java
deleted file mode 100644
index 708f1936..00000000
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializedPartition.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.executor.data;
-
-/**
- * A collection of data elements. The data is stored as an array of bytes.
- * This is a unit of read / write towards {@link 
edu.snu.nemo.runtime.executor.data.block.Block}s.
- * @param <K> the key type of its partitions.
- */
-public final class SerializedPartition<K> implements Partition<byte[], K> {
-  private final K key;
-  private final long elementsTotal;
-  private final byte[] serializedData;
-  private final int length;
-
-  /**
-   * Creates a serialized {@link Partition} having a specific key value.
-   *
-   * @param key            the key.
-   * @param elementsTotal  the total number of elements.
-   * @param serializedData the serialized data.
-   * @param length         the length of the actual serialized data. (It can 
be different with serializedData.length)
-   */
-  public SerializedPartition(final K key,
-                             final long elementsTotal,
-                             final byte[] serializedData,
-                             final int length) {
-    this.key = key;
-    this.elementsTotal = elementsTotal;
-    this.serializedData = serializedData;
-    this.length = length;
-  }
-
-  /**
-   * @return the key value.
-   */
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  /**
-   * @return whether the data in this {@link Partition} is serialized or not.
-   */
-  @Override
-  public boolean isSerialized() {
-    return true;
-  }
-
-  /**
-   * @return the serialized data.
-   */
-  @Override
-  public byte[] getData() {
-    return serializedData;
-  }
-
-  /**
-   * @return the length of the actual data.
-   */
-  public int getLength() {
-    return length;
-  }
-
-  /**
-   * @return the number of elements.
-   */
-  public long getElementsTotal() {
-    return elementsTotal;
-  }
-}
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index fc668e5d..ead6d813 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -16,39 +16,51 @@
 package edu.snu.nemo.runtime.executor.data.block;
 
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
  * This interface represents a block, which is the output of a specific task.
+ *
  * @param <K> the key type of its partitions.
  */
 public interface Block<K extends Serializable> {
 
+  /**
+   * Writes an element to non-committed block.
+   * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method may not support concurrent write.
+   *
+   * @param key     the key.
+   * @param element the element to write.
+   * @throws IOException if this block is already committed.
+   */
+  void write(K key, Object element) throws IOException;
+
   /**
    * Stores {@link NonSerializedPartition}s to this block.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method may not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to store.
-   * @return the size of the data per partition (only when the data is 
serialized in this method).
    * @throws IOException if fail to store.
    */
-  Optional<List<Long>> putPartitions(final Iterable<NonSerializedPartition<K>> 
partitions) throws IOException;
+  void writePartitions(Iterable<NonSerializedPartition<K>> partitions) throws 
IOException;
 
   /**
    * Stores {@link SerializedPartition}s to this block.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method may not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
-   * @return the size of the data per partition.
    * @throws IOException if fail to store.
    */
-  List<Long> putSerializedPartitions(final Iterable<SerializedPartition<K>> 
partitions) throws IOException;
+  void writeSerializedPartitions(Iterable<SerializedPartition<K>> partitions) 
throws IOException;
 
   /**
    * Retrieves the {@link NonSerializedPartition}s in a specific key range 
from this block.
@@ -59,7 +71,7 @@
    * @return an iterable of {@link NonSerializedPartition}s.
    * @throws IOException if failed to retrieve.
    */
-  Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange<K> 
keyRange) throws IOException;
+  Iterable<NonSerializedPartition<K>> readPartitions(KeyRange<K> keyRange) 
throws IOException;
 
   /**
    * Retrieves the {@link SerializedPartition}s in a specific key range.
@@ -69,12 +81,13 @@
    * @return an iterable of {@link SerializedPartition}s.
    * @throws IOException if failed to retrieve.
    */
-  Iterable<SerializedPartition<K>> getSerializedPartitions(final KeyRange<K> 
keyRange) throws IOException;
+  Iterable<SerializedPartition<K>> readSerializedPartitions(KeyRange<K> 
keyRange) throws IOException;
 
   /**
    * Commits this block to prevent further write.
    *
+   * @return the size of each partition if the data in the block is serialized.
    * @throws IOException if failed to commit.
    */
-  void commit() throws IOException;
+  Optional<Map<K, Long>> commit() throws IOException;
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index dbb5600f..34a4476b 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -17,10 +17,13 @@
 
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.metadata.PartitionMetadata;
 import edu.snu.nemo.runtime.executor.data.metadata.FileMetadata;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.*;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -28,10 +31,14 @@
 
 /**
  * This class represents a block which is stored in (local or remote) file.
+ * Concurrent read is supported, but concurrent write is not supported.
+ *
  * @param <K> the key type of its partitions.
  */
+@NotThreadSafe
 public final class FileBlock<K extends Serializable> implements Block<K> {
 
+  private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
   private final String filePath;
   private final FileMetadata<K> metadata;
@@ -39,13 +46,14 @@
   /**
    * Constructor.
    *
-   * @param serializer    the {@link Serializer}.
-   * @param filePath the path of the file that this block will be stored.
-   * @param metadata the metadata for this block.
+   * @param serializer the {@link Serializer}.
+   * @param filePath   the path of the file that this block will be stored.
+   * @param metadata   the metadata for this block.
    */
   public FileBlock(final Serializer serializer,
                    final String filePath,
                    final FileMetadata<K> metadata) {
+    this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
     this.filePath = filePath;
     this.metadata = metadata;
@@ -54,54 +62,81 @@ public FileBlock(final Serializer serializer,
   /**
    * Writes the serialized data of this block having a specific key value as a 
partition to the file
    * where this block resides.
-   * Invariant: This method does not support concurrent write for a single 
block.
-   *            Only one thread have to write at once.
+   * Invariant: This method does not support concurrent write.
    *
    * @param serializedPartitions the iterable of the serialized partitions to 
write.
    * @throws IOException if fail to write.
    */
-  private void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> serializedPartitions)
+  private void writeToFile(final Iterable<SerializedPartition<K>> 
serializedPartitions)
       throws IOException {
     try (final FileOutputStream fileOutputStream = new 
FileOutputStream(filePath, true)) {
       for (final SerializedPartition<K> serializedPartition : 
serializedPartitions) {
         // Reserve a partition write and get the metadata.
         metadata.writePartitionMetadata(
-            serializedPartition.getKey(), serializedPartition.getLength(), 
serializedPartition.getElementsTotal());
+            serializedPartition.getKey(), serializedPartition.getLength(), 
serializedPartition.getElementsCount());
         fileOutputStream.write(serializedPartition.getData(), 0, 
serializedPartition.getLength());
       }
     }
   }
 
+  /**
+   * Writes an element to non-committed block.
+   * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
+   *
+   * @param key     the key.
+   * @param element the element to write.
+   * @throws IOException if this block is already committed.
+   */
+  @Override
+  public void write(final K key,
+                    final Object element) throws IOException {
+    if (metadata.isCommitted()) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+      if (partition == null) {
+        partition = new SerializedPartition<>(key, serializer);
+        nonCommittedPartitionsMap.put(key, partition);
+      }
+      partition.write(element);
+    }
+  }
+
   /**
    * Writes {@link NonSerializedPartition}s to this block.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to write.
    * @throws IOException if fail to write.
    */
   @Override
-  public Optional<List<Long>> putPartitions(final 
Iterable<NonSerializedPartition<K>> partitions) throws IOException {
-    final Iterable<SerializedPartition<K>> convertedPartitions =
-        DataUtil.convertToSerPartitions(serializer, partitions);
-
-    return Optional.of(putSerializedPartitions(convertedPartitions));
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> 
partitions)
+      throws IOException {
+    if (metadata.isCommitted()) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      final Iterable<SerializedPartition<K>> convertedPartitions =
+          DataUtil.convertToSerPartitions(serializer, partitions);
+      writeSerializedPartitions(convertedPartitions);
+    }
   }
 
   /**
    * Writes {@link SerializedPartition}s to this block.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
    * @throws IOException if fail to store.
    */
   @Override
-  public synchronized List<Long> putSerializedPartitions(final 
Iterable<SerializedPartition<K>> partitions)
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> 
partitions)
       throws IOException {
-    final List<Long> partitionSizeList = new ArrayList<>();
-    for (final SerializedPartition serializedPartition : partitions) {
-      partitionSizeList.add((long) serializedPartition.getLength());
+    if (metadata.isCommitted()) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      writeToFile(partitions);
     }
-    writeSerializedPartitions(partitions);
-
-    return partitionSizeList;
   }
 
   /**
@@ -112,40 +147,44 @@ private void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> se
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange 
keyRange) throws IOException {
-    // Deserialize the data
-    final List<NonSerializedPartition<K>> deserializedPartitions = new 
ArrayList<>();
-    try (final FileInputStream fileStream = new FileInputStream(filePath)) {
-      for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataIterable()) {
-        final K key = partitionMetadata.getKey();
-        if (keyRange.includes(key)) {
-          // The key value of this partition is in the range.
-          final long availableBefore = fileStream.available();
-          // We need to limit read bytes on this FileStream, which could be 
over-read by wrapped
-          // compression stream. We recommend to wrap with LimitedInputStream 
once more when
-          // reading input from chained compression InputStream.
-          // Plus, this stream must be not closed to prevent to close the 
filtered file partition.
-          final LimitedInputStream limitedInputStream =
-              new LimitedInputStream(fileStream, 
partitionMetadata.getPartitionSize());
-          final NonSerializedPartition<K> deserializePartition =
-              DataUtil.deserializePartition(
-                  partitionMetadata.getElementsTotal(), serializer, key, 
limitedInputStream);
-          deserializedPartitions.add(deserializePartition);
-          // rearrange file pointer
-          final long toSkip = partitionMetadata.getPartitionSize() - 
availableBefore + fileStream.available();
-          if (toSkip > 0) {
-            skipBytes(fileStream, toSkip);
-          } else if (toSkip < 0) {
-            throw new IOException("file stream has been overread");
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange 
keyRange) throws IOException {
+    if (!metadata.isCommitted()) {
+      throw new IOException("Cannot retrieve elements before a block is 
committed");
+    } else {
+      // Deserialize the data
+      final List<NonSerializedPartition<K>> deserializedPartitions = new 
ArrayList<>();
+      try (final FileInputStream fileStream = new FileInputStream(filePath)) {
+        for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataList()) {
+          final K key = partitionMetadata.getKey();
+          if (keyRange.includes(key)) {
+            // The key value of this partition is in the range.
+            final long availableBefore = fileStream.available();
+            // We need to limit read bytes on this FileStream, which could be 
over-read by wrapped
+            // compression stream. We recommend to wrap with 
LimitedInputStream once more when
+            // reading input from chained compression InputStream.
+            // Plus, this stream must be not closed to prevent to close the 
filtered file partition.
+            final LimitedInputStream limitedInputStream =
+                new LimitedInputStream(fileStream, 
partitionMetadata.getPartitionSize());
+            final NonSerializedPartition<K> deserializePartition =
+                DataUtil.deserializePartition(
+                    partitionMetadata.getElementsTotal(), serializer, key, 
limitedInputStream);
+            deserializedPartitions.add(deserializePartition);
+            // rearrange file pointer
+            final long toSkip = partitionMetadata.getPartitionSize() - 
availableBefore + fileStream.available();
+            if (toSkip > 0) {
+              skipBytes(fileStream, toSkip);
+            } else if (toSkip < 0) {
+              throw new IOException("file stream has been overread");
+            }
+          } else {
+            // Have to skip this partition.
+            skipBytes(fileStream, partitionMetadata.getPartitionSize());
           }
-        } else {
-          // Have to skip this partition.
-          skipBytes(fileStream, partitionMetadata.getPartitionSize());
         }
       }
-    }
 
-    return deserializedPartitions;
+      return deserializedPartitions;
+    }
   }
 
   /**
@@ -157,29 +196,33 @@ private void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> se
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<SerializedPartition<K>> getSerializedPartitions(final 
KeyRange keyRange) throws IOException {
-    // Deserialize the data
-    final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
-    try (final FileInputStream fileStream = new FileInputStream(filePath)) {
-      for (final PartitionMetadata<K> partitionmetadata : 
metadata.getPartitionMetadataIterable()) {
-        final K key = partitionmetadata.getKey();
-        if (keyRange.includes(key)) {
-          // The hash value of this partition is in the range.
-          final byte[] serializedData = new 
byte[partitionmetadata.getPartitionSize()];
-          final int readBytes = fileStream.read(serializedData);
-          if (readBytes != serializedData.length) {
-            throw new IOException("The read data size does not match with the 
partition size.");
+  public Iterable<SerializedPartition<K>> readSerializedPartitions(final 
KeyRange keyRange) throws IOException {
+    if (!metadata.isCommitted()) {
+      throw new IOException("Cannot retrieve elements before a block is 
committed");
+    } else {
+      // Deserialize the data
+      final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
+      try (final FileInputStream fileStream = new FileInputStream(filePath)) {
+        for (final PartitionMetadata<K> partitionmetadata : 
metadata.getPartitionMetadataList()) {
+          final K key = partitionmetadata.getKey();
+          if (keyRange.includes(key)) {
+            // The hash value of this partition is in the range.
+            final byte[] serializedData = new 
byte[partitionmetadata.getPartitionSize()];
+            final int readBytes = fileStream.read(serializedData);
+            if (readBytes != serializedData.length) {
+              throw new IOException("The read data size does not match with 
the partition size.");
+            }
+            partitionsInRange.add(new SerializedPartition<>(
+                key, partitionmetadata.getElementsTotal(), serializedData, 
serializedData.length));
+          } else {
+            // Have to skip this partition.
+            skipBytes(fileStream, partitionmetadata.getPartitionSize());
           }
-          partitionsInRange.add(new SerializedPartition<>(
-              key, partitionmetadata.getElementsTotal(), serializedData, 
serializedData.length));
-        } else {
-          // Have to skip this partition.
-          skipBytes(fileStream, partitionmetadata.getPartitionSize());
         }
       }
-    }
 
-    return partitionsInRange;
+      return partitionsInRange;
+    }
   }
 
   /**
@@ -209,13 +252,17 @@ private void skipBytes(final InputStream inputStream,
    * @throws IOException if failed to open a file channel
    */
   public List<FileArea> asFileAreas(final KeyRange keyRange) throws 
IOException {
-    final List<FileArea> fileAreas = new ArrayList<>();
-    for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataIterable()) {
-      if (keyRange.includes(partitionMetadata.getKey())) {
-        fileAreas.add(new FileArea(filePath, partitionMetadata.getOffset(), 
partitionMetadata.getPartitionSize()));
+    if (!metadata.isCommitted()) {
+      throw new IOException("Cannot retrieve elements before a block is 
committed");
+    } else {
+      final List<FileArea> fileAreas = new ArrayList<>();
+      for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataList()) {
+        if (keyRange.includes(partitionMetadata.getKey())) {
+          fileAreas.add(new FileArea(filePath, partitionMetadata.getOffset(), 
partitionMetadata.getPartitionSize()));
+        }
       }
+      return fileAreas;
     }
-    return fileAreas;
   }
 
   /**
@@ -233,9 +280,34 @@ public void deleteFile() throws IOException {
 
   /**
    * Commits this block to prevent further write.
+   *
+   * @return the size of each partition.
+   * @throws IOException if failed to commit.
    */
   @Override
-  public void commit() throws IOException {
-    metadata.commitBlock();
+  public synchronized Optional<Map<K, Long>> commit() throws IOException {
+    if (!metadata.isCommitted()) {
+      final List<SerializedPartition<K>> partitions = new ArrayList<>();
+      for (final SerializedPartition<K> partition : 
nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        partitions.add(partition);
+      }
+      writeToFile(partitions);
+      nonCommittedPartitionsMap.clear();
+      metadata.commitBlock();
+    }
+    final List<PartitionMetadata<K>> partitionMetadataList = 
metadata.getPartitionMetadataList();
+    final Map<K, Long> partitionSizes = new 
HashMap<>(partitionMetadataList.size());
+    for (final PartitionMetadata<K> partitionMetadata : partitionMetadataList) 
{
+      final K key = partitionMetadata.getKey();
+      final long partitionSize = partitionMetadata.getPartitionSize();
+      if (partitionSizes.containsKey(key)) {
+        partitionSizes.compute(key,
+            (existingKey, existingValue) -> existingValue + partitionSize);
+      } else {
+        partitionSizes.put(key, partitionSize);
+      }
+    }
+    return Optional.of(partitionSizes);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 31508e16..94d22535 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -17,25 +17,26 @@
 
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
-import javax.annotation.concurrent.ThreadSafe;
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * This class represents a block which is stored in local memory and not 
serialized.
+ * Concurrent read is supported, but concurrent write is not supported.
+ *
  * @param <K> the key type of its partitions.
  */
-@ThreadSafe
+@NotThreadSafe
 public final class NonSerializedMemoryBlock<K extends Serializable> implements 
Block<K> {
 
   private final List<NonSerializedPartition<K>> nonSerializedPartitions;
+  private final Map<K, NonSerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
   private volatile boolean committed;
 
@@ -46,27 +47,47 @@
    */
   public NonSerializedMemoryBlock(final Serializer serializer) {
     this.nonSerializedPartitions = new ArrayList<>();
+    this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
     this.committed = false;
   }
 
+  /**
+   * Writes an element to non-committed block.
+   * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
+   *
+   * @param key     the key.
+   * @param element the element to write.
+   * @throws IOException if this block is already committed.
+   */
+  @Override
+  public void write(final K key,
+                    final Object element) throws IOException {
+    if (committed) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      final NonSerializedPartition<K> partition =
+          nonCommittedPartitionsMap.computeIfAbsent(key, absentKey -> new 
NonSerializedPartition<>(key));
+      partition.write(element);
+    }
+  }
+
   /**
    * Stores {@link NonSerializedPartition}s to this block.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to store.
    * @throws IOException if fail to store.
    */
   @Override
-  public synchronized Optional<List<Long>> putPartitions(final 
Iterable<NonSerializedPartition<K>> partitions)
-      throws IOException {
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> 
partitions) throws IOException {
     if (!committed) {
       partitions.forEach(nonSerializedPartitions::add);
     } else {
       throw new IOException("Cannot append partition to the committed block");
     }
-
-    return Optional.empty();
   }
 
   /**
@@ -74,21 +95,17 @@ public NonSerializedMemoryBlock(final Serializer 
serializer) {
    * Because all data in this block is stored in a non-serialized form,
    * the data in these partitions have to be deserialized.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
    * @throws IOException if fail to store.
    */
   @Override
-  public synchronized List<Long> putSerializedPartitions(final 
Iterable<SerializedPartition<K>> partitions)
-      throws IOException {
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> 
partitions) throws IOException {
     if (!committed) {
       final Iterable<NonSerializedPartition<K>> convertedPartitions =
           DataUtil.convertToNonSerPartitions(serializer, partitions);
-      final List<Long> dataSizePerPartition = new ArrayList<>();
-      partitions.forEach(serializedPartition -> 
dataSizePerPartition.add((long) serializedPartition.getData().length));
-      putPartitions(convertedPartitions);
-
-      return dataSizePerPartition;
+      writePartitions(convertedPartitions);
     } else {
       throw new IOException("Cannot append partitions to the committed block");
     }
@@ -103,14 +120,14 @@ public NonSerializedMemoryBlock(final Serializer 
serializer) {
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange 
keyRange) throws IOException {
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange 
keyRange) throws IOException {
     if (committed) {
       // Retrieves data in the hash range from the target block
       final List<NonSerializedPartition<K>> retrievedPartitions = new 
ArrayList<>();
       nonSerializedPartitions.forEach(partition -> {
         final K key = partition.getKey();
         if (keyRange.includes(key)) {
-          retrievedPartitions.add(new NonSerializedPartition(key, 
partition.getData()));
+          retrievedPartitions.add(partition);
         }
       });
 
@@ -130,15 +147,25 @@ public NonSerializedMemoryBlock(final Serializer 
serializer) {
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<SerializedPartition<K>> getSerializedPartitions(final 
KeyRange keyRange) throws IOException {
-    return DataUtil.convertToSerPartitions(serializer, 
getPartitions(keyRange));
+  public Iterable<SerializedPartition<K>> readSerializedPartitions(final 
KeyRange keyRange) throws IOException {
+    return DataUtil.convertToSerPartitions(serializer, 
readPartitions(keyRange));
   }
 
   /**
    * Commits this block to prevent further write.
+   *
+   * @return empty optional because the data is not serialized.
    */
   @Override
-  public synchronized void commit() {
-    committed = true;
+  public synchronized Optional<Map<K, Long>> commit() {
+    if (!committed) {
+      nonCommittedPartitionsMap.forEach((key, partition) -> {
+        partition.commit();
+        nonSerializedPartitions.add(partition);
+      });
+      nonCommittedPartitionsMap.clear();
+      committed = true;
+    }
+    return Optional.empty();
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index ec6ee77b..266ea933 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -17,25 +17,26 @@
 
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
 import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * This class represents a block which is serialized and stored in local 
memory.
+ * Concurrent read is supported, but concurrent write is not supported.
+ *
  * @param <K> the key type of its partitions.
  */
 @ThreadSafe
 public final class SerializedMemoryBlock<K extends Serializable> implements 
Block<K> {
 
   private final List<SerializedPartition<K>> serializedPartitions;
+  private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
   private volatile boolean committed;
 
@@ -45,26 +46,50 @@
    * @param serializer the {@link Serializer}.
    */
   public SerializedMemoryBlock(final Serializer serializer) {
+    this.serializedPartitions = new ArrayList<>();
+    this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
-    serializedPartitions = new ArrayList<>();
-    committed = false;
+    this.committed = false;
+  }
+
+  /**
+   * Writes an element to non-committed block.
+   * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
+   *
+   * @param key     the key.
+   * @param element the element to write.
+   * @throws IOException if this block is already committed.
+   */
+  @Override
+  public void write(final K key,
+                    final Object element) throws IOException {
+    if (committed) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+      if (partition == null) {
+        partition = new SerializedPartition<>(key, serializer);
+        nonCommittedPartitionsMap.put(key, partition);
+      }
+      partition.write(element);
+    }
   }
 
   /**
    * Serialized and stores {@link NonSerializedPartition}s to this block.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to store.
-   * @return the size of the data per partition.
    * @throws IOException if fail to store.
    */
   @Override
-  public synchronized Optional<List<Long>> putPartitions(final 
Iterable<NonSerializedPartition<K>> partitions)
-      throws IOException {
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> 
partitions) throws IOException {
     if (!committed) {
       final Iterable<SerializedPartition<K>> convertedPartitions = 
DataUtil.convertToSerPartitions(
           serializer, partitions);
-      return Optional.of(putSerializedPartitions(convertedPartitions));
+      writeSerializedPartitions(convertedPartitions);
     } else {
       throw new IOException("Cannot append partitions to the committed block");
     }
@@ -73,21 +98,15 @@ public SerializedMemoryBlock(final Serializer serializer) {
   /**
    * Stores {@link SerializedPartition}s to this block.
    * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
    * @throws IOException if fail to store.
    */
   @Override
-  public synchronized List<Long> putSerializedPartitions(final 
Iterable<SerializedPartition<K>> partitions)
-      throws IOException {
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> 
partitions) throws IOException {
     if (!committed) {
-      final List<Long> partitionSizeList = new ArrayList<>();
-      partitions.forEach(serializedPartition -> {
-        partitionSizeList.add((long) serializedPartition.getLength());
-        serializedPartitions.add(serializedPartition);
-      });
-
-      return partitionSizeList;
+      partitions.forEach(serializedPartitions::add);
     } else {
       throw new IOException("Cannot append partitions to the committed block");
     }
@@ -103,8 +122,8 @@ public SerializedMemoryBlock(final Serializer serializer) {
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange 
keyRange) throws IOException {
-    return DataUtil.convertToNonSerPartitions(serializer, 
getSerializedPartitions(keyRange));
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange 
keyRange) throws IOException {
+    return DataUtil.convertToNonSerPartitions(serializer, 
readSerializedPartitions(keyRange));
   }
 
   /**
@@ -116,7 +135,7 @@ public SerializedMemoryBlock(final Serializer serializer) {
    * @throws IOException if failed to retrieve.
    */
   @Override
-  public Iterable<SerializedPartition<K>> getSerializedPartitions(final 
KeyRange keyRange) throws IOException {
+  public Iterable<SerializedPartition<K>> readSerializedPartitions(final 
KeyRange keyRange) throws IOException {
     if (committed) {
       final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
       serializedPartitions.forEach(serializedPartition -> {
@@ -135,9 +154,31 @@ public SerializedMemoryBlock(final Serializer serializer) {
 
   /**
    * Commits this block to prevent further write.
+   *
+   * @return the size of each partition.
+   * @throws IOException if failed to commit.
    */
   @Override
-  public synchronized void commit() {
-    committed = true;
+  public synchronized Optional<Map<K, Long>> commit() throws IOException {
+    if (!committed) {
+      for (final SerializedPartition<K> partition : 
nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        serializedPartitions.add(partition);
+      }
+      nonCommittedPartitionsMap.clear();
+      committed = true;
+    }
+    final Map<K, Long> partitionSizes = new 
HashMap<>(serializedPartitions.size());
+    for (final SerializedPartition<K> serializedPartition : 
serializedPartitions) {
+      final K key = serializedPartition.getKey();
+      final long partitionSize = serializedPartition.getLength();
+      if (partitionSizes.containsKey(key)) {
+        partitionSizes.compute(key,
+            (existingKey, existingValue) -> existingValue + partitionSize);
+      } else {
+        partitionSizes.put(key, partitionSize);
+      }
+    }
+    return Optional.of(partitionSizes);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index a43d04e7..df33b514 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -75,13 +75,13 @@ public final synchronized void writePartitionMetadata(final 
K key,
   }
 
   /**
-   * Gets a iterable containing the partition metadata of corresponding block.
+   * Gets a list containing the partition metadata of corresponding block.
    *
-   * @return the iterable containing the partition metadata.
+   * @return the list containing the partition metadata.
    * @throws IOException if fail to get the iterable.
    */
-  public final Iterable<PartitionMetadata<K>> getPartitionMetadataIterable() 
throws IOException {
-    return Collections.unmodifiableCollection(partitionMetadataList);
+  public final List<PartitionMetadata<K>> getPartitionMetadataList() throws 
IOException {
+    return Collections.unmodifiableList(partitionMetadataList);
   }
 
   /**
@@ -105,4 +105,11 @@ public final synchronized void 
writePartitionMetadata(final K key,
   protected final void setCommitted(final boolean committed) {
     this.committed.set(committed);
   }
+
+  /**
+   * @return whether this file is committed or not.
+   */
+  public final boolean isCommitted() {
+    return committed.get();
+  }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 8f98aa88..8cc9c972 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -72,7 +72,7 @@ public void deleteMetadata() throws IOException {
    */
   @Override
   public synchronized void commitBlock() throws IOException {
-    final Iterable<PartitionMetadata<K>> partitionMetadataItr = 
getPartitionMetadataIterable();
+    final Iterable<PartitionMetadata<K>> partitionMetadataItr = 
getPartitionMetadataList();
     try (
         final FileOutputStream metafileOutputStream = new 
FileOutputStream(metaFilePath, false);
         final DataOutputStream dataOutputStream = new 
DataOutputStream(metafileOutputStream)
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/NonSerializedPartition.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/NonSerializedPartition.java
similarity index 61%
rename from 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/NonSerializedPartition.java
rename to 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/NonSerializedPartition.java
index e7c56ef8..0739b0cb 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/NonSerializedPartition.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/NonSerializedPartition.java
@@ -13,7 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data.partition;
+
+import edu.snu.nemo.runtime.executor.data.DataUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A collection of data elements. The data is stored as an iterable of 
elements.
@@ -22,37 +28,66 @@
  */
 public final class NonSerializedPartition<K> implements Partition<Iterable, K> 
{
   private final K key;
-  private final Iterable nonSerializedData;
+  private final List nonSerializedData;
   private final long numSerializedBytes;
   private final long numEncodedBytes;
+  private volatile boolean committed;
 
   /**
-   * Creates a non-serialized {@link Partition} having a specific key value.
+   * Creates a non-serialized {@link Partition} without actual data.
+   * Data can be written to this partition until it is committed.
    *
-   * @param key  the key.
-   * @param data the non-serialized data.
+   * @param key the key of this partition.
    */
-  public NonSerializedPartition(final K key,
-                                final Iterable data) {
-    this(key, data, -1, -1);
+  public NonSerializedPartition(final K key) {
+    this.key = key;
+    this.nonSerializedData = new ArrayList();
+    this.numSerializedBytes = -1;
+    this.numEncodedBytes = -1;
+    this.committed = false;
   }
 
   /**
-   * Creates a non-serialized {@link Partition} having a specific key value.
+   * Creates a non-serialized {@link Partition} with actual data.
+   * Data cannot be written to this partition after the construction.
    *
-   * @param key  the key.
-   * @param data the non-serialized data.
+   * @param key                the key.
+   * @param data               the non-serialized data.
    * @param numSerializedBytes the number of bytes in serialized form (which 
is, for example, encoded and compressed)
-   * @param numEncodedBytes the number of bytes in encoded form (which is 
ready to be decoded)
+   * @param numEncodedBytes    the number of bytes in encoded form (which is 
ready to be decoded)
    */
   public NonSerializedPartition(final K key,
-                                final Iterable data,
+                                final List data,
                                 final long numSerializedBytes,
                                 final long numEncodedBytes) {
     this.key = key;
     this.nonSerializedData = data;
     this.numSerializedBytes = numSerializedBytes;
     this.numEncodedBytes = numEncodedBytes;
+    this.committed = true;
+  }
+
+  /**
+   * Writes an element to non-committed partition.
+   *
+   * @param element element to write.
+   * @throws IOException if the partition is already committed.
+   */
+  @Override
+  public void write(final Object element) throws IOException {
+    if (committed) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      nonSerializedData.add(element);
+    }
+  }
+
+  /**
+   * Commits a partition to prevent further data write.
+   */
+  @Override
+  public void commit() {
+    this.committed = true;
   }
 
   /**
@@ -97,9 +132,14 @@ public boolean isSerialized() {
 
   /**
    * @return the non-serialized data.
+   * @throws IOException if the partition is not committed yet.
    */
   @Override
-  public Iterable getData() {
-    return nonSerializedData;
+  public Iterable getData() throws IOException {
+    if (!committed) {
+      throw new IOException("The partition is not committed yet!");
+    } else {
+      return nonSerializedData;
+    }
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/Partition.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/Partition.java
similarity index 67%
rename from 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/Partition.java
rename to 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/Partition.java
index 7885b898..b9f717e0 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/Partition.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/Partition.java
@@ -13,7 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data.partition;
+
+import java.io.IOException;
 
 /**
  * A collection of data elements.
@@ -23,6 +25,20 @@
  */
 public interface Partition<T, K> {
 
+  /**
+   * Writes an element to non-committed partition.
+   *
+   * @param element element to write.
+   * @throws IOException if the partition is already committed.
+   */
+  void write(Object element) throws IOException;
+
+  /**
+   * Commits a partition to prevent further data write.
+   * @throws IOException if fail to commit partition.
+   */
+  void commit() throws IOException;
+
   /**
    * @return the key value.
    */
@@ -35,6 +51,7 @@
 
   /**
    * @return the data in this {@link Partition}.
+   * @throws IOException if the partition is not committed yet.
    */
-  T getData();
+  T getData() throws IOException;
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
new file mode 100644
index 00000000..8a771317
--- /dev/null
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partition;
+
+import edu.snu.nemo.common.DirectByteArrayOutputStream;
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static edu.snu.nemo.runtime.executor.data.DataUtil.buildOutputStream;
+
+/**
+ * A collection of data elements. The data is stored as an array of bytes.
+ * This is a unit of read / write towards {@link 
edu.snu.nemo.runtime.executor.data.block.Block}s.
+ * @param <K> the key type of its partitions.
+ */
+public final class SerializedPartition<K> implements Partition<byte[], K> {
+  private final K key;
+  private volatile long elementsCount;
+  private volatile byte[] serializedData;
+  private volatile int length;
+  private volatile boolean committed;
+  // Will be null when the partition is committed when it is constructed.
+  @Nullable private final DirectByteArrayOutputStream bytesOutputStream;
+  @Nullable private final OutputStream wrappedStream;
+  @Nullable private final Coder coder;
+
+  /**
+   * Creates a serialized {@link Partition} without actual data.
+   * Data can be written to this partition until it is committed.
+   *
+   * @param key        the key of this partition.
+   * @param serializer the serializer to be used to serialize data.
+   * @throws IOException if fail to chain the output stream.
+   */
+  public SerializedPartition(final K key,
+                             final Serializer serializer) throws IOException {
+    this.key = key;
+    this.elementsCount = 0;
+    this.serializedData = new byte[0];
+    this.length = 0;
+    this.committed = false;
+    this.bytesOutputStream = new DirectByteArrayOutputStream();
+    this.wrappedStream = buildOutputStream(bytesOutputStream, 
serializer.getStreamChainers());
+    this.coder = serializer.getCoder();
+  }
+
+  /**
+   * Creates a serialized {@link Partition} with actual data.
+   * Data cannot be written to this partition after the construction.
+   *
+   * @param key            the key.
+   * @param elementsTotal  the total number of elements.
+   * @param serializedData the serialized data.
+   * @param length         the length of the actual serialized data. (It can 
be different with serializedData.length)
+   */
+  public SerializedPartition(final K key,
+                             final long elementsTotal,
+                             final byte[] serializedData,
+                             final int length) {
+    this.key = key;
+    this.elementsCount = elementsTotal;
+    this.serializedData = serializedData;
+    this.length = length;
+    this.committed = true;
+    this.bytesOutputStream = null;
+    this.wrappedStream = null;
+    this.coder = null;
+  }
+
+  /**
+   * Writes an element to non-committed partition.
+   *
+   * @param element element to write.
+   * @throws IOException if the partition is already committed.
+   */
+  @Override
+  public void write(final Object element) throws IOException {
+    if (committed) {
+      throw new IOException("The partition is already committed!");
+    } else {
+      try {
+        coder.encode(element, wrappedStream);
+        elementsCount++;
+      } catch (final IOException e) {
+        wrappedStream.close();
+      }
+    }
+  }
+
+  /**
+   * Commits a partition to prevent further data write.
+   * @throws IOException if fail to commit partition.
+   */
+  @Override
+  public void commit() throws IOException {
+    if (!committed) {
+      // We need to close wrappedStream on here, because 
DirectByteArrayOutputStream:getBufDirectly() returns
+      // inner buffer directly, which can be an unfinished(not flushed) buffer.
+      wrappedStream.close();
+      this.serializedData = bytesOutputStream.getBufDirectly();
+      this.length = bytesOutputStream.getCount();
+      this.committed = true;
+    }
+  }
+
+  /**
+   * @return the key value.
+   */
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  /**
+   * @return whether the data in this {@link Partition} is serialized or not.
+   */
+  @Override
+  public boolean isSerialized() {
+    return true;
+  }
+
+  /**
+   * @return the serialized data.
+   * @throws IOException if the partition is not committed yet.
+   */
+  @Override
+  public byte[] getData() throws IOException {
+    if (!committed) {
+      throw new IOException("The partition is not committed yet!");
+    } else {
+      return serializedData;
+    }
+  }
+
+  /**
+   * @return the length of the actual data.
+   * @throws IOException if the partition is not committed yet.
+   */
+  public int getLength() throws IOException {
+    if (!committed) {
+      throw new IOException("The partition is not committed yet!");
+    } else {
+      return length;
+    }
+  }
+
+  /**
+   * @return the number of elements.
+   * @throws IOException if the partition is not committed yet.
+   */
+  public long getElementsCount() throws IOException {
+    if (!committed) {
+      throw new IOException("The partition is not committed yet!");
+    } else {
+      return elementsCount;
+    }
+  }
+}
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
index 5971bf44..8af20e81 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
@@ -16,15 +16,10 @@
 package edu.snu.nemo.runtime.executor.data.partitioner;
 
 import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.runtime.executor.data.Partition;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.IntStream;
 
 /**
  * An implementation of {@link Partitioner} which hashes output data from a 
source task appropriate to detect data skew.
@@ -36,38 +31,35 @@
  * to prevent the extra deserialize - rehash - serialize process.
  * For more information, please check {@link 
edu.snu.nemo.conf.JobConf.HashRangeMultiplier}.
  */
-public final class DataSkewHashPartitioner implements Partitioner {
+public final class DataSkewHashPartitioner implements Partitioner<Integer> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataSkewHashPartitioner.class.getName());
-  private final int hashRangeMultiplier; // Hash range multiplier.
+  private final KeyExtractor keyExtractor;
+  private final BigInteger hashRangeBase;
+  private final int hashRange;
 
-  public DataSkewHashPartitioner(final int hashRangeMultiplier) {
-    this.hashRangeMultiplier = hashRangeMultiplier;
-  }
-
-  @Override
-  public List<Partition> partition(final Iterable elements,
-                                   final int dstParallelism,
-                                   final KeyExtractor keyExtractor) {
+  /**
+   * Constructor.
+   *
+   * @param hashRangeMultiplier the hash range multiplier.
+   * @param dstParallelism      the number of destination tasks.
+   * @param keyExtractor        the key extractor that extracts keys from 
elements.
+   */
+  public DataSkewHashPartitioner(final int hashRangeMultiplier,
+                                 final int dstParallelism,
+                                 final KeyExtractor keyExtractor) {
+    this.keyExtractor = keyExtractor;
     // For this hash range, please check the description of 
HashRangeMultiplier in JobConf.
     // For actual hash range to use, we calculate a prime number right next to 
the desired hash range.
-    final BigInteger hashRangeBase = new 
BigInteger(String.valueOf(dstParallelism * hashRangeMultiplier));
-    final int hashRange = hashRangeBase.nextProbablePrime().intValue();
-
+    this.hashRangeBase = new BigInteger(String.valueOf(dstParallelism * 
hashRangeMultiplier));
+    this.hashRange = hashRangeBase.nextProbablePrime().intValue();
     LOG.info("hashRangeBase {} resulting hashRange {}", hashRangeBase, 
hashRange);
+  }
 
-    // Separate the data into partitions according to the hash value of their 
key.
-    final List<List> elementsByKey = new ArrayList<>(hashRange);
-    IntStream.range(0, hashRange).forEach(hashVal -> elementsByKey.add(new 
ArrayList<>()));
-    elements.forEach(element -> {
-      // Hash the data by its key, and "modulo" by the hash range.
-      final int hashVal = Math.abs(keyExtractor.extractKey(element).hashCode() 
% hashRange);
-      elementsByKey.get(hashVal).add(element);
-    });
-
-    final List<Partition> partitions = new ArrayList<>(hashRange);
-    for (int hashIdx = 0; hashIdx < hashRange; hashIdx++) {
-      partitions.add(new NonSerializedPartition(hashIdx, 
elementsByKey.get(hashIdx)));
-    }
-    return partitions;
+  /**
+   * @see Partitioner#partition(Object).
+   */
+  @Override
+  public Integer partition(final Object element) {
+    return Math.abs(keyExtractor.extractKey(element).hashCode() % hashRange);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java
index 8f81bc4f..4494e180 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java
@@ -16,36 +16,30 @@
 package edu.snu.nemo.runtime.executor.data.partitioner;
 
 import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.Partition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.IntStream;
 
 /**
  * An implementation of {@link Partitioner} which hashes output data from a 
source task
  * according to the key of elements.
  * The data will be hashed by their key, and applied "modulo" operation by the 
number of destination tasks.
  */
-public final class HashPartitioner implements Partitioner {
+public final class HashPartitioner implements Partitioner<Integer> {
+  private final KeyExtractor keyExtractor;
+  private final int dstParallelism;
 
-  @Override
-  public List<Partition> partition(final Iterable elements,
-                                   final int dstParallelism,
-                                   final KeyExtractor keyExtractor) {
-    final List<List> elementsByKey = new ArrayList<>(dstParallelism);
-    IntStream.range(0, dstParallelism).forEach(dstTaskIdx -> 
elementsByKey.add(new ArrayList<>()));
-    elements.forEach(element -> {
-      // Hash the data by its key, and "modulo" by the number of destination 
tasks.
-      final int dstIdx = Math.abs(keyExtractor.extractKey(element).hashCode() 
% dstParallelism);
-      elementsByKey.get(dstIdx).add(element);
-    });
+  /**
+   * Constructor.
+   *
+   * @param dstParallelism the number of destination tasks.
+   * @param keyExtractor   the key extractor that extracts keys from elements.
+   */
+  public HashPartitioner(final int dstParallelism,
+                         final KeyExtractor keyExtractor) {
+    this.keyExtractor = keyExtractor;
+    this.dstParallelism = dstParallelism;
+  }
 
-    final List<Partition> partitions = new ArrayList<>(dstParallelism);
-    for (int hashIdx = 0; hashIdx < dstParallelism; hashIdx++) {
-      partitions.add(new NonSerializedPartition(hashIdx, 
elementsByKey.get(hashIdx)));
-    }
-    return partitions;
+  @Override
+  public Integer partition(final Object element) {
+    return Math.abs(keyExtractor.extractKey(element).hashCode() % 
dstParallelism);
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
index 29b5aa90..6aa95f25 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
@@ -15,23 +15,14 @@
  */
 package edu.snu.nemo.runtime.executor.data.partitioner;
 
-import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.Partition;
-
-import java.util.Collections;
-import java.util.List;
-
 /**
  * An implementation of {@link Partitioner} which makes an output data
- * from a source task to a single {@link Partition}.
+ * from a source task to a single {@link 
edu.snu.nemo.runtime.executor.data.partition.Partition}.
  */
-public final class IntactPartitioner implements Partitioner {
+public final class IntactPartitioner implements Partitioner<Integer> {
 
   @Override
-  public List<Partition> partition(final Iterable elements,
-                                   final int dstParallelism,
-                                   final KeyExtractor keyExtractor) {
-    return Collections.singletonList(new NonSerializedPartition(0, elements));
+  public Integer partition(final Object element) {
+    return 0;
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/Partitioner.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/Partitioner.java
index 906c955b..b1aefd8d 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/Partitioner.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/Partitioner.java
@@ -15,25 +15,21 @@
  */
 package edu.snu.nemo.runtime.executor.data.partitioner;
 
-import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.runtime.executor.data.Partition;
-
-import java.util.List;
+import java.io.Serializable;
 
 /**
  * This interface represents the way of partitioning output data from a source 
task.
- * It takes an iterable of elements and divide the data into multiple {@link 
Partition}s,
- * according to the number of destination tasks, the key of each element, etc.
+ * It takes an element and designates key of {@link 
edu.snu.nemo.runtime.executor.data.partition.Partition}
+ * to write the element, according to the number of destination tasks, the key 
of each element, etc.
+ * @param <K> the key type of the partition to write.
  */
-public interface Partitioner {
+public interface Partitioner<K extends Serializable> {
 
   /**
    * Divides the output data from a task into multiple blocks.
    *
-   * @param elements       the output data from a source task.
-   * @param dstParallelism the number of destination tasks.
-   * @param keyExtractor   extracts keys from elements.
-   * @return the list of partitioned blocks.
+   * @param element        the output element from a source task.
+   * @return the key of the partition in the block to write the element.
    */
-  List<Partition> partition(Iterable elements, int dstParallelism, 
KeyExtractor keyExtractor);
+   K partition(Object element);
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index b1431fab..880ab1aa 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@ -40,7 +40,7 @@ protected AbstractBlockStore(final SerializerManager 
serializerManager) {
    * @param blockId the ID of the block to get the coder.
    * @return the coder.
    */
-  public final Serializer getSerializerFromWorker(final String blockId) {
+  protected final Serializer getSerializerFromWorker(final String blockId) {
     final String runtimeEdgeId = 
RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId);
     return serializerManager.getSerializer(runtimeEdgeId);
   }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
index 8be11a69..6de1c445 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/BlockStore.java
@@ -18,11 +18,11 @@
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 
 import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -35,47 +35,65 @@
    *
    * @param blockId the ID of the block to create.
    * @throws BlockWriteException for any error occurred while trying to create 
a block.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
   void createBlock(String blockId) throws BlockWriteException;
 
   /**
-   * Saves an iterable of {@link NonSerializedPartition}s to a block.
+   * Writes an element to non-committed block.
+   * Invariant: This method may not support concurrent write for a single 
block.
+   * Only one thread have to write at once.
+   *
+   * @param blockId the ID of the block to write element.
+   * @param key     the key.
+   * @param element the element to write.
+   * @param <K>     the key type of the partitions.
+   * @throws BlockWriteException for any error occurred while trying to write 
a block.
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
+   */
+  <K extends Serializable> void write(String blockId,
+                                      K key,
+                                      Object element) throws 
BlockWriteException;
+
+  /**
+   * Writes an iterable of {@link NonSerializedPartition}s to a block.
    * If the block exists already, appends the data to it.
    * Invariant: This method may not support concurrent write for a single 
block.
-   *            Only one thread have to write at once.
+   * Only one thread have to write at once.
    *
    * @param blockId    of the block.
    * @param partitions to save to a block.
    * @param <K>        the key type of the partitions.
-   * @return the size of the data per partition (only when the data is 
serialized).
    * @throws BlockWriteException for any error occurred while trying to write 
a block.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  <K extends Serializable> Optional<List<Long>> putPartitions(String blockId,
-                                     Iterable<NonSerializedPartition<K>> 
partitions) throws BlockWriteException;
+  <K extends Serializable> void writePartitions(String blockId,
+                                                
Iterable<NonSerializedPartition<K>> partitions)
+      throws BlockWriteException;
 
   /**
-   * Saves an iterable of {@link SerializedPartition}s to a block.
+   * Writes an iterable of {@link SerializedPartition}s to a block.
    * If the block exists already, appends the data to it.
    * Invariant: This method may not support concurrent write for a single 
block.
-   *            Only one thread have to write at once.
+   * Only one thread have to write at once.
    *
    * @param blockId    of the block.
    * @param partitions to save to a block.
    * @param <K>        the key type of the partitions.
-   * @return the size of the data per partition (only when the data is 
serialized).
    * @throws BlockWriteException for any error occurred while trying to write 
a block.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  <K extends Serializable> List<Long> putSerializedPartitions(String blockId,
-                                     Iterable<SerializedPartition<K>> 
partitions) throws BlockWriteException;
+  <K extends Serializable> void writeSerializedPartitions(String blockId,
+                                                          
Iterable<SerializedPartition<K>> partitions)
+      throws BlockWriteException;
 
   /**
    * Retrieves {@link NonSerializedPartition}s.
@@ -86,38 +104,42 @@
    * @param <K>      the key type of the partitions.
    * @return the result elements from the target block (if the target block 
exists).
    * @throws BlockFetchException for any error occurred while trying to fetch 
a block.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  <K extends Serializable> Optional<Iterable<NonSerializedPartition<K>>> 
getPartitions(String blockId,
-                                                           KeyRange<K> 
keyRange) throws BlockFetchException;
+  <K extends Serializable> Optional<Iterable<NonSerializedPartition<K>>> 
readPartitions(String blockId,
+                                                                               
         KeyRange<K> keyRange)
+      throws BlockFetchException;
 
   /**
    * Retrieves {@link SerializedPartition}s in a specific {@link KeyRange} 
from a block.
    *
-   * @param blockId   of the target block.
+   * @param blockId  of the target block.
    * @param keyRange the key range.
-   * @param <K> the key type of the partitions.
+   * @param <K>      the key type of the partitions.
    * @return the result elements from the target block (if the target block 
exists).
    * @throws BlockFetchException for any error occurred while trying to fetch 
a partition.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  <K extends Serializable> Optional<Iterable<SerializedPartition<K>>> 
getSerializedPartitions(String blockId,
-                                                                     
KeyRange<K> keyRange) throws BlockFetchException;
+  <K extends Serializable> Optional<Iterable<SerializedPartition<K>>> 
readSerializedPartitions(String blockId,
+                                                                               
                KeyRange<K> keyRange)
+      throws BlockFetchException;
 
   /**
    * Notifies that all writes for a block is end.
    *
    * @param blockId of the block.
+   * @param <K>     the key type of the partitions.
+   * @return the size of each partition if the data in the block is serialized.
    * @throws BlockWriteException if fail to commit.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  void commitBlock(String blockId) throws BlockWriteException;
+  <K extends Serializable> Optional<Map<K, Long>> commitBlock(String blockId) 
throws BlockWriteException;
 
   /**
    * Removes a block of data.
@@ -125,9 +147,9 @@
    * @param blockId of the block.
    * @return whether the partition exists or not.
    * @throws BlockFetchException for any error occurred while trying to remove 
a block.
-   *         (This exception will be thrown to the scheduler
-   *          through {@link edu.snu.nemo.runtime.executor.Executor} and
-   *          have to be handled by the scheduler with fault tolerance 
mechanism.)
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
-  Boolean removeBlock(String blockId);
+  boolean removeBlock(String blockId);
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
index ef48b54f..41ea964c 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/GlusterFileStore.java
@@ -21,6 +21,8 @@
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.block.Block;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.metadata.RemoteFileMetadata;
 import edu.snu.nemo.runtime.executor.data.block.FileBlock;
@@ -83,37 +85,53 @@ public void createBlock(final String blockId) {
   }
 
   /**
-   * @see BlockStore#putPartitions(String, Iterable)
+   * @see BlockStore#write(String, Serializable, Object).
    */
   @Override
-  public <K extends Serializable>
-  Optional<List<Long>> putPartitions(final String blockId,
-                                     final Iterable<NonSerializedPartition<K>> 
partitions)
+  public <K extends Serializable> void write(final String blockId,
+                                             final K key,
+                                             final Object element) throws 
BlockWriteException {
+    try {
+      final Block<K> block = blockMap.get(blockId);
+      if (block == null) {
+        throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
+      }
+      block.write(key, element);
+    } catch (final IOException e) {
+      throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
+    }
+  }
+
+  /**
+   * @see BlockStore#writePartitions(String, Iterable)
+   */
+  @Override
+  public <K extends Serializable> void writePartitions(final String blockId,
+                                                       final 
Iterable<NonSerializedPartition<K>> partitions)
       throws BlockWriteException {
     try {
       final Block<K> block = blockMap.get(blockId);
       if (block == null) {
         throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
       }
-      return block.putPartitions(partitions);
+      block.writePartitions(partitions);
     } catch (final IOException e) {
       throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
     }
   }
 
   /**
-   * @see BlockStore#putSerializedPartitions(String, Iterable)
+   * @see BlockStore#writeSerializedPartitions(String, Iterable)
    */
   @Override
-  public <K extends Serializable>
-  List<Long> putSerializedPartitions(final String blockId,
-                                     final Iterable<SerializedPartition<K>> 
partitions) {
+  public <K extends Serializable> void writeSerializedPartitions(final String 
blockId,
+                                                                 final 
Iterable<SerializedPartition<K>> partitions) {
     try {
       final Block<K> block = blockMap.get(blockId);
       if (block == null) {
         throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
       }
-      return block.putSerializedPartitions(partitions);
+      block.writeSerializedPartitions(partitions);
     } catch (final IOException e) {
       throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
     }
@@ -122,10 +140,10 @@ public void createBlock(final String blockId) {
   /**
    * Retrieves {@link NonSerializedPartition}s in a specific {@link KeyRange} 
from a block.
    *
-   * @see BlockStore#getPartitions(String, KeyRange)
+   * @see BlockStore#readPartitions(String, KeyRange)
    */
   @Override
-  public <K extends Serializable> 
Optional<Iterable<NonSerializedPartition<K>>> getPartitions(
+  public <K extends Serializable> 
Optional<Iterable<NonSerializedPartition<K>>> readPartitions(
       final String blockId,
       final KeyRange<K> keyRange) throws BlockFetchException {
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
@@ -135,7 +153,7 @@ public void createBlock(final String blockId) {
       // Deserialize the target data in the corresponding file.
       try {
         final FileBlock<K> block = getBlockFromFile(blockId);
-        final Iterable<NonSerializedPartition<K>> partitionsInRange = 
block.getPartitions(keyRange);
+        final Iterable<NonSerializedPartition<K>> partitionsInRange = 
block.readPartitions(keyRange);
         return Optional.of(partitionsInRange);
       } catch (final IOException e) {
         throw new BlockFetchException(e);
@@ -144,18 +162,19 @@ public void createBlock(final String blockId) {
   }
 
   /**
-   * @see BlockStore#getSerializedPartitions(String, KeyRange)
+   * @see BlockStore#readSerializedPartitions(String, KeyRange)
    */
   @Override
   public <K extends Serializable>
-  Optional<Iterable<SerializedPartition<K>>> getSerializedPartitions(final 
String blockId, final KeyRange<K> keyRange) {
+  Optional<Iterable<SerializedPartition<K>>> readSerializedPartitions(final 
String blockId,
+                                                                      final 
KeyRange<K> keyRange) {
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
     if (!new File(filePath).isFile()) {
       return Optional.empty();
     } else {
       try {
         final FileBlock<K> block = getBlockFromFile(blockId);
-        final Iterable<SerializedPartition<K>> partitionsInRange = 
block.getSerializedPartitions(keyRange);
+        final Iterable<SerializedPartition<K>> partitionsInRange = 
block.readSerializedPartitions(keyRange);
         return Optional.of(partitionsInRange);
       } catch (final IOException e) {
         throw new BlockFetchException(e);
@@ -169,13 +188,16 @@ public void createBlock(final String blockId) {
    * this store does not have to maintain any information about the block.
    *
    * @param blockId the ID of the block to commit.
+   * @return the size of each partition.
    */
   @Override
-  public void commitBlock(final String blockId) throws BlockWriteException {
+  public <K extends Serializable>
+  Optional<Map<K, Long>> commitBlock(final String blockId) throws 
BlockWriteException {
     final Block block = blockMap.get(blockId);
+    final Optional<Map<K, Long>> partitionSizes;
     if (block != null) {
       try {
-        block.commit();
+        partitionSizes = block.commit();
       } catch (final IOException e) {
         throw new BlockWriteException(e);
       }
@@ -183,6 +205,7 @@ public void commitBlock(final String blockId) throws 
BlockWriteException {
       throw new BlockWriteException(new Throwable("There isn't any block with 
id " + blockId));
     }
     blockMap.remove(blockId);
+    return partitionSizes;
   }
 
   /**
@@ -192,7 +215,7 @@ public void commitBlock(final String blockId) throws 
BlockWriteException {
    * @return whether the block exists or not.
    */
   @Override
-  public Boolean removeBlock(final String blockId) throws BlockFetchException {
+  public boolean removeBlock(final String blockId) throws BlockFetchException {
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
 
     try {
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
index 61dd116d..4b419ccb 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
@@ -19,13 +19,12 @@
 import edu.snu.nemo.common.exception.BlockWriteException;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
-import edu.snu.nemo.runtime.executor.data.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.SerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,53 +49,70 @@ protected LocalBlockStore(final SerializerManager 
coderManager) {
   }
 
   /**
-   * @see BlockStore#putPartitions(String, Iterable)
+   * @see BlockStore#write(String, Serializable, Object).
    */
   @Override
-  public final <K extends Serializable>
-  Optional<List<Long>> putPartitions(final String blockId,
-                                     final Iterable<NonSerializedPartition<K>> 
partitions)
+  public final <K extends Serializable> void write(final String blockId,
+                                                   final K key,
+                                                   final Object element) 
throws BlockWriteException {
+    try {
+      final Block<K> block = blockMap.get(blockId);
+      if (block == null) {
+        throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
+      }
+      block.write(key, element);
+    } catch (final IOException e) {
+      throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
+    }
+  }
+
+  /**
+   * @see BlockStore#writePartitions(String, Iterable)
+   */
+  @Override
+  public final <K extends Serializable> void writePartitions(final String 
blockId,
+                                                             final 
Iterable<NonSerializedPartition<K>> partitions)
       throws BlockWriteException {
     try {
       final Block<K> block = blockMap.get(blockId);
       if (block == null) {
         throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
       }
-      return block.putPartitions(partitions);
+      block.writePartitions(partitions);
     } catch (final IOException e) {
       throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
     }
   }
 
   /**
-   * @see BlockStore#putSerializedPartitions(String, Iterable)
+   * @see BlockStore#writeSerializedPartitions(String, Iterable)
    */
   @Override
   public final <K extends Serializable>
-  List<Long> putSerializedPartitions(final String blockId,
-                                     final Iterable<SerializedPartition<K>> 
partitions) {
+  void writeSerializedPartitions(final String blockId,
+                                 final Iterable<SerializedPartition<K>> 
partitions) {
     try {
       final Block<K> block = blockMap.get(blockId);
       if (block == null) {
         throw new BlockWriteException(new Throwable("The block " + blockId + 
"is not created yet."));
       }
-      return block.putSerializedPartitions(partitions);
+      block.writeSerializedPartitions(partitions);
     } catch (final IOException e) {
       throw new BlockWriteException(new Throwable("Failed to store partitions 
to this block."));
     }
   }
 
   /**
-   * @see BlockStore#getPartitions(String, KeyRange)
+   * @see BlockStore#readPartitions(String, KeyRange)
    */
   @Override
   public final <K extends Serializable>
-  Optional<Iterable<NonSerializedPartition<K>>> getPartitions(final String 
blockId, final KeyRange<K> keyRange) {
+  Optional<Iterable<NonSerializedPartition<K>>> readPartitions(final String 
blockId, final KeyRange<K> keyRange) {
     final Block<K> block = blockMap.get(blockId);
 
     if (block != null) {
       try {
-        final Iterable<NonSerializedPartition<K>> partitionsInRange = 
block.getPartitions(keyRange);
+        final Iterable<NonSerializedPartition<K>> partitionsInRange = 
block.readPartitions(keyRange);
         return Optional.of(partitionsInRange);
       } catch (final IOException e) {
         throw new BlockFetchException(e);
@@ -107,16 +123,17 @@ protected LocalBlockStore(final SerializerManager 
coderManager) {
   }
 
   /**
-   * @see BlockStore#getSerializedPartitions(String, 
edu.snu.nemo.runtime.common.data.KeyRange)
+   * @see BlockStore#readSerializedPartitions(String, 
edu.snu.nemo.runtime.common.data.KeyRange)
    */
   @Override
   public final <K extends Serializable>
-  Optional<Iterable<SerializedPartition<K>>> getSerializedPartitions(final 
String blockId, final KeyRange<K> keyRange) {
+  Optional<Iterable<SerializedPartition<K>>> readSerializedPartitions(final 
String blockId,
+                                                                      final 
KeyRange<K> keyRange) {
     final Block<K> block = blockMap.get(blockId);
 
     if (block != null) {
       try {
-        final Iterable<SerializedPartition<K>> partitionsInRange = 
block.getSerializedPartitions(keyRange);
+        final Iterable<SerializedPartition<K>> partitionsInRange = 
block.readSerializedPartitions(keyRange);
         return Optional.of(partitionsInRange);
       } catch (final IOException e) {
         throw new BlockFetchException(e);
@@ -130,11 +147,11 @@ protected LocalBlockStore(final SerializerManager 
coderManager) {
    * @see BlockStore#commitBlock(String)
    */
   @Override
-  public final void commitBlock(final String blockId) {
+  public final <K extends Serializable> Optional<Map<K, Long>> 
commitBlock(final String blockId) {
     final Block block = blockMap.get(blockId);
     if (block != null) {
       try {
-        block.commit();
+        return block.commit();
       } catch (final IOException e) {
         throw new BlockWriteException(e);
       }
@@ -146,7 +163,7 @@ public final void commitBlock(final String blockId) {
   /**
    * @return the map between the IDs and {@link Block}.
    */
-  public final Map<String, Block> getBlockMap() {
+  protected final Map<String, Block> getBlockMap() {
     return blockMap;
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
index fd54512d..90f3df83 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalFileStore.java
@@ -75,7 +75,7 @@ public void createBlock(final String blockId) {
    * @return whether the block exists or not.
    */
   @Override
-  public Boolean removeBlock(final String blockId) throws BlockFetchException {
+  public boolean removeBlock(final String blockId) throws BlockFetchException {
     final FileBlock fileBlock = (FileBlock) getBlockMap().remove(blockId);
     if (fileBlock == null) {
       return false;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
index d0b593e5..1d5a02df 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -51,7 +51,7 @@ public void createBlock(final String blockId) {
    * @see BlockStore#removeBlock(String)
    */
   @Override
-  public Boolean removeBlock(final String blockId) {
+  public boolean removeBlock(final String blockId) {
     return getBlockMap().remove(blockId) != null;
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index 718a2213..f93fcec9 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -50,7 +50,7 @@ public void createBlock(final String blockId) {
    * @see BlockStore#removeBlock(String)
    */
   @Override
-  public Boolean removeBlock(final String blockId) {
+  public boolean removeBlock(final String blockId) {
     return getBlockMap().remove(blockId) != null;
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 61dfeba1..9032b806 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -114,8 +114,6 @@ public InputReader(final int dstTaskIndex,
 
   /**
    * Read data in the assigned range of hash value.
-   * Constraint: If a block is written by {@link 
OutputWriter#dataSkewWrite(List)}
-   * or {@link OutputWriter#writeShuffle(List)}, it must be read using this 
method.
    *
    * @return the list of the completable future of the data.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 0c665f11..59d55c3b 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -23,7 +23,6 @@
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
-import edu.snu.nemo.runtime.executor.data.Partition;
 import edu.snu.nemo.runtime.executor.data.partitioner.*;
 
 import javax.annotation.Nullable;
@@ -38,11 +37,20 @@
   private final String srcVertexId;
   @Nullable private final IRVertex dstIrVertex;
   private final DataStoreProperty.Value blockStoreValue;
-  private final Map<PartitionerProperty.Value, Partitioner> partitionerMap;
-  private final List<Long> accumulatedPartitionSizeInfo;
-  private final List<Long> writtenBytes;
+  private Optional<Long> writtenBytes;
   private final BlockManagerWorker blockManagerWorker;
+  private final Partitioner partitioner;
 
+  /**
+   * Constructor.
+   *
+   * @param hashRangeMultiplier the {@link 
edu.snu.nemo.conf.JobConf.HashRangeMultiplier}.
+   * @param srcTaskIdx          the index of the source task.
+   * @param srcRuntimeVertexId  the ID of the source vertex.
+   * @param dstIrVertex         the destination IR vertex.
+   * @param runtimeEdge         the {@link RuntimeEdge}.
+   * @param blockManagerWorker  the {@link BlockManagerWorker}.
+   */
   public OutputWriter(final int hashRangeMultiplier,
                       final int srcTaskIdx,
                       final String srcRuntimeVertexId,
@@ -57,14 +65,27 @@ public OutputWriter(final int hashRangeMultiplier,
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
     this.blockStoreValue = 
runtimeEdge.getProperty(ExecutionProperty.Key.DataStore);
-    this.partitionerMap = new HashMap<>();
-    this.writtenBytes = new ArrayList<>();
-    // TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
-    this.accumulatedPartitionSizeInfo = new ArrayList<>();
-    partitionerMap.put(PartitionerProperty.Value.IntactPartitioner, new 
IntactPartitioner());
-    partitionerMap.put(PartitionerProperty.Value.HashPartitioner, new 
HashPartitioner());
-    partitionerMap.put(PartitionerProperty.Value.DataSkewHashPartitioner,
-        new DataSkewHashPartitioner(hashRangeMultiplier));
+    this.writtenBytes = Optional.empty();
+
+    // Setup partitioner
+    final int dstParallelism = getDstParallelism();
+    final KeyExtractor keyExtractor = 
runtimeEdge.getProperty(ExecutionProperty.Key.KeyExtractor);
+    final PartitionerProperty.Value partitionerPropertyValue =
+        runtimeEdge.getProperty(ExecutionProperty.Key.Partitioner);
+    switch (partitionerPropertyValue) {
+      case IntactPartitioner:
+        this.partitioner = new IntactPartitioner();
+        break;
+      case HashPartitioner:
+        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor);
+        break;
+      case DataSkewHashPartitioner:
+        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, 
dstParallelism, keyExtractor);
+        break;
+      default:
+        throw new UnsupportedPartitionerException(
+            new Throwable("Partitioner " + partitionerPropertyValue + " is not 
supported."));
+    }
     blockManagerWorker.createBlock(blockId, blockStoreValue);
   }
 
@@ -74,52 +95,15 @@ public OutputWriter(final int hashRangeMultiplier,
    * @param dataToWrite An iterable for the elements to be written.
    */
   public void write(final Iterable dataToWrite) {
-    final Boolean isDataSizeMetricCollectionEdge = 
MetricCollectionProperty.Value.DataSkewRuntimePass
-        
.equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
-
-    // Group the data into blocks.
-    final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getProperty(ExecutionProperty.Key.Partitioner);
-    final int dstParallelism = getDstParallelism();
-
-    final Partitioner partitioner = 
partitionerMap.get(partitionerPropertyValue);
-    if (partitioner == null) {
-      throw new UnsupportedPartitionerException(
-          new Throwable("Partitioner " + partitionerPropertyValue + " is not 
supported."));
-    }
-
-    final KeyExtractor keyExtractor = 
runtimeEdge.getProperty(ExecutionProperty.Key.KeyExtractor);
-    final List<Partition> partitionsToWrite;
-
     final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
         runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-    if (duplicateDataProperty != null
-        && 
!duplicateDataProperty.getRepresentativeEdgeId().equals(runtimeEdge.getId())
-        && duplicateDataProperty.getGroupSize() > 1) {
-      partitionsToWrite = partitioner.partition(Collections.emptyList(), 
dstParallelism, keyExtractor);
-    } else {
-      partitionsToWrite = partitioner.partition(dataToWrite, dstParallelism, 
keyExtractor);
-    }
-
-    // Write the grouped blocks into partitions.
-    // TODO #492: Modularize the data communication pattern.
-    final DataCommunicationPatternProperty.Value comValue =
-        
runtimeEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern);
-
-    if (DataCommunicationPatternProperty.Value.OneToOne.equals(comValue)) {
-      writeOneToOne(partitionsToWrite);
-    } else if 
(DataCommunicationPatternProperty.Value.BroadCast.equals(comValue)) {
-      writeBroadcast(partitionsToWrite);
-    } else if 
(DataCommunicationPatternProperty.Value.Shuffle.equals(comValue)) {
-      // If the dynamic optimization which detects data skew is enabled, sort 
the data and write it.
-      if (isDataSizeMetricCollectionEdge) {
-        dataSkewWrite(partitionsToWrite);
-      } else {
-        writeShuffle(partitionsToWrite);
-      }
-    } else {
-      throw new UnsupportedCommPatternException(new Exception("Communication 
pattern not supported"));
-    }
+    if (duplicateDataProperty == null
+        || 
duplicateDataProperty.getRepresentativeEdgeId().equals(runtimeEdge.getId())
+        || duplicateDataProperty.getGroupSize() <= 1) {
+      dataToWrite.forEach(element -> {
+        blockManagerWorker.write(blockId, partitioner.partition(element), 
element, blockStoreValue);
+      });
+    } // If else, does not need to write because the data is duplicated.
   }
 
   /**
@@ -133,80 +117,19 @@ public void close() {
     final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
         runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
     final int multiplier = duplicateDataProperty == null ? 1 : 
duplicateDataProperty.getGroupSize();
-    blockManagerWorker.commitBlock(blockId, blockStoreValue,
-        accumulatedPartitionSizeInfo, srcVertexId, getDstParallelism() * 
multiplier, usedDataHandling);
+
+    final boolean isDataSizeMetricCollectionEdge = 
MetricCollectionProperty.Value.DataSkewRuntimePass
+        
.equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
+    this.writtenBytes = blockManagerWorker.commitBlock(
+        blockId, blockStoreValue, isDataSizeMetricCollectionEdge, srcVertexId,
+        getDstParallelism() * multiplier, usedDataHandling);
   }
 
   /**
    * @return the total written bytes.
    */
   public Optional<Long> getWrittenBytes() {
-    if (writtenBytes.isEmpty()) {
-      return Optional.empty(); // no serialized data.
-    } else {
-      long totalWrittenBytes = 0;
-      for (final long writtenPartitionBytes : writtenBytes) {
-        totalWrittenBytes += writtenPartitionBytes;
-      }
-      return Optional.of(totalWrittenBytes);
-    }
-  }
-
-  private void writeOneToOne(final List<Partition> partitionsToWrite) {
-    // Write data.
-    final Optional<List<Long>> partitionSizeList =
-        blockManagerWorker.putPartitions(blockId, partitionsToWrite, 
blockStoreValue);
-    partitionSizeList.ifPresent(this::addWrittenBytes);
-  }
-
-  private void writeBroadcast(final List<Partition> partitionsToWrite) {
-    writeOneToOne(partitionsToWrite);
-  }
-
-  private void writeShuffle(final List<Partition> partitionsToWrite) {
-    final int dstParallelism = getDstParallelism();
-    if (partitionsToWrite.size() != dstParallelism) {
-      throw new BlockWriteException(
-          new Throwable("The number of given blocks are not matched with the 
destination parallelism."));
-    }
-
-    // Write data.
-    final Optional<List<Long>> partitionSizeList =
-        blockManagerWorker.putPartitions(blockId, partitionsToWrite, 
blockStoreValue);
-    partitionSizeList.ifPresent(this::addWrittenBytes);
-  }
-
-  /**
-   * Writes partitions in a single block and collects the size of each 
partition.
-   * This function will be called only when we need to split or recombine an 
output data from a task after it is stored
-   * (e.g., dynamic data skew handling).
-   * We extend the hash range with the factor {@link 
edu.snu.nemo.conf.JobConf.HashRangeMultiplier} in advance
-   * to prevent the extra deserialize - rehash - serialize process.
-   * Each data of this block having same key hash value will be collected as a 
single partition.
-   * This partition will be the unit of retrieval and recombination of this 
block.
-   * Constraint: If a block is written by this method, it have to be read by 
{@link InputReader#readDataInRange()}.
-   * TODO #378: Elaborate block construction during data skew pass
-   *
-   * @param partitionsToWrite a list of the partitions to be written.
-   */
-  private void dataSkewWrite(final List<Partition> partitionsToWrite) {
-
-    // Write data.
-    final Optional<List<Long>> partitionSizeList =
-        blockManagerWorker.putPartitions(blockId, partitionsToWrite, 
blockStoreValue);
-    partitionSizeList.ifPresent(partitionsSize -> {
-      addWrittenBytes(partitionsSize);
-      this.accumulatedPartitionSizeInfo.addAll(partitionsSize);
-    });
-  }
-
-  /**
-   * Accumulates the size of written partitions.
-   *
-   * @param partitionSizeList the list of written partitions.
-   */
-  private void addWrittenBytes(final List<Long> partitionSizeList) {
-    partitionSizeList.forEach(writtenBytes::add);
+    return writtenBytes;
   }
 
   /**
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 532ebd6b..1c4e4586 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -280,7 +280,7 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
     case DataSizeMetric:
       final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = 
message.getDataSizeMetricMsg();
       // TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
-      accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeInfoList(),
+      accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
           dataSizeMetricMsg.getSrcIRVertexId(), 
dataSizeMetricMsg.getBlockId());
       break;
     case MetricMessageReceived:
@@ -300,21 +300,26 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
    * TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
    * TODO #513: Replace MetricCollectionBarrierVertex with a Customizable 
IRVertex.
    *
-   * @param blockSizeInfo the block size info to accumulate.
-   * @param srcVertexId   the ID of the source vertex.
-   * @param blockId       the ID of the block.
+   * @param partitionSizeInfo the size of partitions in a block to accumulate.
+   * @param srcVertexId       the ID of the source vertex.
+   * @param blockId           the ID of the block.
    */
-  private void accumulateBarrierMetric(final List<Long> blockSizeInfo,
-                                      final String srcVertexId,
-                                      final String blockId) {
+  private void accumulateBarrierMetric(final 
List<ControlMessage.PartitionSizeEntry> partitionSizeInfo,
+                                       final String srcVertexId,
+                                       final String blockId) {
     final IRVertex vertexToSendMetricDataTo = irVertices.stream()
         .filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst()
         .orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist 
in the submitted Physical Plan"));
 
+    final List<Pair<Integer, Long>> partitionSizes = new ArrayList<>();
+    partitionSizeInfo.forEach(partitionSizeEntry -> {
+      partitionSizes.add(Pair.of(partitionSizeEntry.getKey(), 
partitionSizeEntry.getSize()));
+    });
+
     if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) {
-      final MetricCollectionBarrierVertex<Long> metricCollectionBarrierVertex =
+      final MetricCollectionBarrierVertex<Pair<Integer, Long>> 
metricCollectionBarrierVertex =
           (MetricCollectionBarrierVertex) vertexToSendMetricDataTo;
-      metricCollectionBarrierVertex.accumulateMetric(blockId, blockSizeInfo);
+      metricCollectionBarrierVertex.accumulateMetric(blockId, partitionSizes);
     } else {
       throw new RuntimeException("Something wrong happened at 
DataSkewCompositePass.");
     }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 22e47b85..3861ee23 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -15,15 +15,13 @@
  */
 package edu.snu.nemo.tests.runtime.common.optimizer.pass.runtime;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 
@@ -31,14 +29,14 @@
  * Test {@link DataSkewRuntimePass}.
  */
 public class DataSkewRuntimePassTest {
-  private final Map<String, List<Long>> testMetricData = new HashMap<>();
+  private final Map<String, List<Pair<Integer, Long>>> testMetricData = new 
HashMap<>();
 
   @Before
   public void setUp() {
     // Sum is 30 for each hashRanges: 0-3, 3-5, 5-7, 7-9, 9-10.
-    testMetricData.put("1", Arrays.asList(1L, 2L, 4L, 2L, 1L, 8L, 2L, 4L, 2L, 
10L));
-    testMetricData.put("2", Arrays.asList(3L, 5L, 5L, 7L, 10L, 3L, 5L, 4L, 8L, 
5L));
-    testMetricData.put("3", Arrays.asList(2L, 3L, 5L, 5L, 5L, 6L, 6L, 8L, 4L, 
15L));
+    testMetricData.put("Block-1", buildPartitionSizeList(Arrays.asList(1L, 2L, 
4L, 2L, 1L, 8L, 2L, 4L, 2L, 10L)));
+    testMetricData.put("Block-2", buildPartitionSizeList(Arrays.asList(3L, 5L, 
5L, 7L, 10L, 3L, 5L, 4L, 8L, 5L)));
+    testMetricData.put("Block-3", buildPartitionSizeList(Arrays.asList(2L, 3L, 
5L, 5L, 5L, 6L, 6L, 8L, 4L, 15L)));
   }
 
   /**
@@ -62,4 +60,20 @@ public void testDataSkewDynamicOptimizationPass() {
     assertEquals(9, keyRanges.get(4).rangeBeginInclusive());
     assertEquals(10, keyRanges.get(4).rangeEndExclusive());
   }
+
+  /**
+   * Builds a partition size metrics with given partition sizes for test.
+   *
+   * @param partitionSizes the size of partitions.
+   * @return the partition size metrics.
+   */
+  private static List<Pair<Integer, Long>> buildPartitionSizeList(final 
List<Long> partitionSizes) {
+    final List<Pair<Integer, Long>> partitionMetrics = new 
ArrayList<>(partitionSizes.size());
+    int key = 0;
+    for (final long partitionSize : partitionSizes) {
+      partitionMetrics.add(Pair.of(key, partitionSize));
+      key++;
+    }
+    return partitionMetrics;
+  }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index a4550559..e36459ea 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -27,6 +27,8 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.executor.data.*;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import 
edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -124,10 +126,10 @@ public void setUp() throws Exception {
         number -> 
readTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Read_IR_vertex")));
 
     // Generates the ids and the data of the blocks to be used.
+    final String shuffleEdge = 
RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
     IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx -> {
       // Create a block for each writer task.
-      final String blockId = RuntimeIdGenerator.generateBlockId(
-          
RuntimeIdGenerator.generateRuntimeEdgeId(String.valueOf(blockIdList.size())), 
writeTaskIdx);
+      final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, 
writeTaskIdx);
       blockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
@@ -139,23 +141,23 @@ public void setUp() throws Exception {
       IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx -> {
         final int partitionsCount = writeTaskIdx * NUM_READ_TASKS + 
readTaskIdx;
         partitionsForBlock.add(new NonSerializedPartition(
-            readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, 
(partitionsCount + 1) * DATA_SIZE)));
+            readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, 
(partitionsCount + 1) * DATA_SIZE), -1, -1));
       });
     });
 
     // Following part is for the concurrent read test.
-    final String writeTaskId = 
RuntimeIdGenerator.generateLogicalTaskId("Conc_write_IR_vertex");
+    final String writeTaskId = 
RuntimeIdGenerator.generateLogicalTaskId("conc_write_IR_vertex");
     final List<String> concReadTaskIdList = new 
ArrayList<>(NUM_CONC_READ_TASKS);
+    final String concEdge = 
RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
 
     // Generates the ids and the data to be used.
-    concBlockId = RuntimeIdGenerator.generateBlockId(
-        RuntimeIdGenerator.generateRuntimeEdgeId("concurrent read"), 
NUM_WRITE_TASKS + NUM_READ_TASKS + 1);
-    blockManagerMaster.initializeState(concBlockId, "Unused");
+    concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_TASKS 
+ NUM_READ_TASKS + 1);
+    blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
         concBlockId, BlockState.State.SCHEDULED, null);
     IntStream.range(0, NUM_CONC_READ_TASKS).forEach(
-        number -> 
concReadTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Conc_read_IR_vertex")));
-    concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, 
CONC_READ_DATA_SIZE));
+        number -> 
concReadTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("conc_read_IR_vertex")));
+    concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, 
CONC_READ_DATA_SIZE), -1, -1);
 
     // Following part is for the shuffle in hash range test
     final int numHashedBlocks = NUM_WRITE_HASH_TASKS;
@@ -168,15 +170,15 @@ public void setUp() throws Exception {
 
     // Generates the ids of the tasks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(
-        number -> 
writeHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Hash_write_IR_vertex")));
+        number -> 
writeHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_write_IR_vertex")));
     IntStream.range(0, NUM_READ_HASH_TASKS).forEach(
-        number -> 
readHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Hash_read_IR_vertex")));
+        number -> 
readHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_read_IR_vertex")));
+    final String hashEdge = 
RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
 
     // Generates the ids and the data of the blocks to be used.
     IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
-          RuntimeIdGenerator.generateRuntimeEdgeId("shuffle in range"),
-          NUM_WRITE_TASKS + NUM_READ_TASKS + 1 + writeTaskIdx);
+          hashEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1 + writeTaskIdx);
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
@@ -187,7 +189,7 @@ public void setUp() throws Exception {
           hashedBlock.add(new NonSerializedPartition(hashValue, 
getFixedKeyRangedNumList(
               hashValue,
               writeTaskIdx * HASH_DATA_SIZE * HASH_RANGE + hashValue * 
HASH_DATA_SIZE,
-              writeTaskIdx * HASH_DATA_SIZE * HASH_RANGE + (hashValue + 1) * 
HASH_DATA_SIZE))));
+              writeTaskIdx * HASH_DATA_SIZE * HASH_RANGE + (hashValue + 1) * 
HASH_DATA_SIZE), -1, -1)));
       hashedBlockPartitionList.add(hashedBlock);
     });
 
@@ -316,14 +318,15 @@ private void shuffle(final BlockStore writerSideStore,
           @Override
           public Boolean call() {
             try {
-              IntStream.range(writeTaskIdx, writeTaskIdx + 1).forEach(blockIdx 
-> {
-                final String blockId = blockIdList.get(blockIdx);
-                writerSideStore.createBlock(blockId);
-                writerSideStore.putPartitions(blockId, 
partitionsPerBlock.get(blockIdx));
-                writerSideStore.commitBlock(blockId);
-                blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED,
-                    "Writer side of the shuffle edge");
-              });
+              final String blockId = blockIdList.get(writeTaskIdx);
+              writerSideStore.createBlock(blockId);
+              for (final NonSerializedPartition<Integer> partition : 
partitionsPerBlock.get(writeTaskIdx)) {
+                final Iterable data = partition.getData();
+                data.forEach(element -> writerSideStore.write(blockId, 
partition.getKey(), element));
+              }
+              writerSideStore.commitBlock(blockId);
+              blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED,
+                  "Writer side of the shuffle edge");
               return true;
             } catch (final Exception e) {
               e.printStackTrace();
@@ -411,7 +414,7 @@ private void concurrentRead(final BlockStore 
writerSideStore,
       public Boolean call() {
         try {
           writerSideStore.createBlock(concBlockId);
-          writerSideStore.putPartitions(concBlockId, 
Collections.singleton(concBlockPartition));
+          writerSideStore.writePartitions(concBlockId, 
Collections.singleton(concBlockPartition));
           writerSideStore.commitBlock(concBlockId);
           blockManagerMaster.onBlockStateChanged(
               concBlockId, BlockState.State.COMMITTED, "Writer side of the 
concurrent read edge");
@@ -476,7 +479,7 @@ public Boolean call() {
    * Assumes following circumstances:
    * Task 1 (write (hash 0~3))->         (read (hash 0~1))-> Task 3
    * Task 2 (write (hash 0~3))-> shuffle (read (hash 2))-> Task 4
-   * (read (hash 3))-> Task 5
+   *                                     (read (hash 3))-> Task 5
    * It checks that each writer and reader does not throw any exception
    * and the read data is identical with written data (including the order).
    */
@@ -496,7 +499,7 @@ public Boolean call() {
             try {
               final String blockId = hashedBlockIdList.get(writeTaskIdx);
               writerSideStore.createBlock(blockId);
-              writerSideStore.putPartitions(blockId, 
hashedBlockPartitionList.get(writeTaskIdx));
+              writerSideStore.writePartitions(blockId, 
hashedBlockPartitionList.get(writeTaskIdx));
               writerSideStore.commitBlock(blockId);
               blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED,
                   "Writer side of the shuffle in hash range edge");
@@ -582,14 +585,14 @@ private void readResultCheck(final String blockId,
                                final BlockStore blockStore,
                                final Iterable expectedResult) throws 
IOException {
     final Optional<Iterable<SerializedPartition<Integer>>> optionalSerResult =
-        blockStore.getSerializedPartitions(blockId, hashRange);
+        blockStore.readSerializedPartitions(blockId, hashRange);
     if (!optionalSerResult.isPresent()) {
       throw new IOException("The (serialized) result of get block" + blockId + 
" in range " +
           hashRange + " is empty.");
     }
     final Iterable<SerializedPartition<Integer>> serializedResult = 
optionalSerResult.get();
     final Optional<Iterable<NonSerializedPartition>> optionalNonSerResult =
-        blockStore.getPartitions(blockId, hashRange);
+        blockStore.readPartitions(blockId, hashRange);
     if (!optionalSerResult.isPresent()) {
       throw new IOException("The (non-serialized) result of get block" + 
blockId + " in range " +
           hashRange + " is empty.");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to