johnyangk closed pull request #68: [NEMO-125] Fix data loss bug caused by 
SailfishSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/68
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index ab1ece120..d5c50e4bf 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,9 +32,8 @@ public SailfishPass() {
         new SailfishRelayReshapingPass(),
         new SailfishEdgeDataFlowModelPass(),
         new SailfishEdgeDataStorePass(),
-        // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-        // new SailfishEdgeDecoderPass(),
-        // new SailfishEdgeEncoderPass(),
+        new SailfishEdgeDecoderPass(),
+        new SailfishEdgeEncoderPass(),
         new SailfishEdgeUsedDataHandlingPass()
     ));
   }
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 507d36542..0ffba2ee5 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -40,13 +40,13 @@
   private static final String outputFileName = "sample_output_wordcount";
   private static final String testResourceFileName = "test_output_wordcount";
   private static final String executorResourceFileName = fileBasePath + 
"beam_sample_executor_resources.json";
+  private static final String oneExecutorResourceFileName = fileBasePath + 
"beam_sample_one_executor_resources.json";
   private static final String inputFilePath =  fileBasePath + inputFileName;
   private static final String outputFilePath =  fileBasePath + outputFileName;
 
   @Before
   public void setUp() throws Exception {
     builder = new ArgBuilder()
-        .addResourceJson(executorResourceFileName)
         .addUserMain(WordCount.class.getCanonicalName())
         .addUserArgs(inputFilePath, outputFilePath);
   }
@@ -63,6 +63,7 @@ public void tearDown() throws Exception {
   @Test (timeout = TIMEOUT)
   public void test() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName())
         
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
@@ -71,14 +72,25 @@ public void test() throws Exception {
   @Test (timeout = TIMEOUT)
   public void testSailfish() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_sailfish")
         
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 
+  @Test (timeout = TIMEOUT)
+  public void testSailfishInOneExecutor() throws Exception {
+    JobLauncher.main(builder
+        .addResourceJson(oneExecutorResourceFileName)
+        .addJobId(WordCountITCase.class.getSimpleName() + 
"_sailfishInOneExecutor")
+        
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+
   @Test (timeout = TIMEOUT)
   public void testPado() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_pado")
         
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
         .build());
diff --git a/examples/resources/beam_sample_one_executor_resources.json 
b/examples/resources/beam_sample_one_executor_resources.json
new file mode 100644
index 000000000..069ed973d
--- /dev/null
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -0,0 +1,7 @@
+[
+  {
+    "type": "Transient",
+    "memory_mb": 512,
+    "capacity": 5
+  }
+]
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 69eb9bab1..50c43d930 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -49,44 +49,39 @@ private DataUtil() {
   /**
    * Serializes the elements in a non-serialized partition into an output 
stream.
    *
-   * @param encoderFactory                the encoderFactory to encode the 
elements.
+   * @param encoderFactory         the encoderFactory to encode the elements.
    * @param nonSerializedPartition the non-serialized partition to serialize.
    * @param bytesOutputStream      the output stream to write.
-   * @return total number of elements in the partition.
    * @throws IOException if fail to serialize.
    */
-  public static long serializePartition(final EncoderFactory encoderFactory,
-                                        final NonSerializedPartition 
nonSerializedPartition,
-                                        final OutputStream bytesOutputStream) 
throws IOException {
-    long elementsCount = 0;
+  private static void serializePartition(final EncoderFactory encoderFactory,
+                                         final NonSerializedPartition 
nonSerializedPartition,
+                                         final OutputStream bytesOutputStream) 
throws IOException {
     final EncoderFactory.Encoder encoder = 
encoderFactory.create(bytesOutputStream);
     for (final Object element : nonSerializedPartition.getData()) {
       encoder.encode(element);
-      elementsCount++;
     }
-
-    return elementsCount;
   }
 
   /**
    * Reads the data of a partition from an input stream and deserializes it.
    *
-   * @param elementsInPartition the number of elements in this partition.
-   * @param serializer          the serializer to decode the bytes.
-   * @param key                 the key value of the result partition.
-   * @param inputStream         the input stream which will return the data in 
the partition as bytes.
-   * @param <K>                 the key type of the partitions.
+   * @param partitionSize the size of the partition to deserialize.
+   * @param serializer    the serializer to decode the bytes.
+   * @param key           the key value of the result partition.
+   * @param inputStream   the input stream which will return the data in the 
partition as bytes.
+   * @param <K>           the key type of the partitions.
    * @return the list of deserialized elements.
    * @throws IOException if fail to deserialize.
    */
-  public static <K extends Serializable> NonSerializedPartition 
deserializePartition(final long elementsInPartition,
+  public static <K extends Serializable> NonSerializedPartition 
deserializePartition(final int partitionSize,
                                                                                
      final Serializer serializer,
                                                                                
      final K key,
                                                                                
      final InputStream inputStream)
       throws IOException {
     final List deserializedData = new ArrayList();
     final InputStreamIterator iterator = new 
InputStreamIterator(Collections.singletonList(inputStream).iterator(),
-        serializer, elementsInPartition);
+        serializer, partitionSize);
     iterator.forEachRemaining(deserializedData::add);
     return new NonSerializedPartition(key, deserializedData, 
iterator.getNumSerializedBytes(),
         iterator.getNumEncodedBytes());
@@ -111,15 +106,14 @@ public static long serializePartition(final 
EncoderFactory encoderFactory,
           final DirectByteArrayOutputStream bytesOutputStream = new 
DirectByteArrayOutputStream();
           final OutputStream wrappedStream = 
buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
       ) {
-        final long elementsTotal =
-            serializePartition(serializer.getEncoderFactory(), 
partitionToConvert, wrappedStream);
+        serializePartition(serializer.getEncoderFactory(), partitionToConvert, 
wrappedStream);
         // We need to close wrappedStream on here, because 
DirectByteArrayOutputStream:getBufDirectly() returns
         // inner buffer directly, which can be an unfinished(not flushed) 
buffer.
         wrappedStream.close();
         final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
         final int actualLength = bytesOutputStream.getCount();
         serializedPartitions.add(
-            new SerializedPartition<>(partitionToConvert.getKey(), 
elementsTotal, serializedBytes, actualLength));
+            new SerializedPartition<>(partitionToConvert.getKey(), 
serializedBytes, actualLength));
       }
     }
     return serializedPartitions;
@@ -144,7 +138,7 @@ public static long serializePartition(final EncoderFactory 
encoderFactory,
       try (final ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = 
deserializePartition(
-            partitionToConvert.getElementsCount(), serializer, key, 
byteArrayInputStream);
+            partitionToConvert.getLength(), serializer, key, 
byteArrayInputStream);
         nonSerializedPartitions.add(deserializePartition);
       }
     }
@@ -211,7 +205,6 @@ public static Iterable concatNonSerPartitions(final 
Iterable<NonSerializedPartit
     private volatile T next;
     private volatile boolean cannotContinueDecoding = false;
     private volatile DecoderFactory.Decoder<T> decoder = null;
-    private volatile long elementsDecoded = 0;
     private volatile long numSerializedBytes = 0;
     private volatile long numEncodedBytes = 0;
 
@@ -221,8 +214,8 @@ public static Iterable concatNonSerPartitions(final 
Iterable<NonSerializedPartit
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
      */
-    public InputStreamIterator(final Iterator<InputStream> inputStreams,
-                               final Serializer<?, T> serializer) {
+    InputStreamIterator(final Iterator<InputStream> inputStreams,
+                        final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
       // -1 means no limit.
@@ -234,12 +227,12 @@ public InputStreamIterator(final Iterator<InputStream> 
inputStreams,
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
-     * @param limit        The number of elements from the {@link InputStream}.
+     * @param limit        The bytes to read from the {@link InputStream}.
      */
-    public InputStreamIterator(
+    private InputStreamIterator(
         final Iterator<InputStream> inputStreams,
         final Serializer<?, T> serializer,
-        final long limit) {
+        final int limit) {
       if (limit < 0) {
         throw new IllegalArgumentException("Negative limit not allowed.");
       }
@@ -256,7 +249,8 @@ public boolean hasNext() {
       if (cannotContinueDecoding) {
         return false;
       }
-      if (limit != -1 && limit == elementsDecoded) {
+      if (limit != -1 && limit == (serializedCountingStream == null
+          ? numSerializedBytes : numSerializedBytes + 
serializedCountingStream.getCount())) {
         cannotContinueDecoding = true;
         return false;
       }
@@ -280,7 +274,6 @@ public boolean hasNext() {
         try {
           next = decoder.decode();
           hasNext = true;
-          elementsDecoded++;
           return true;
         } catch (final IOException e) {
           // IOException from decoder indicates EOF event.
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 9a64d8601..41f125b65 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -78,8 +78,7 @@ private void writeToFile(final 
Iterable<SerializedPartition<K>> serializedPartit
     try (final FileOutputStream fileOutputStream = new 
FileOutputStream(filePath, true)) {
       for (final SerializedPartition<K> serializedPartition : 
serializedPartitions) {
         // Reserve a partition write and get the metadata.
-        metadata.writePartitionMetadata(
-            serializedPartition.getKey(), serializedPartition.getLength(), 
serializedPartition.getElementsCount());
+        metadata.writePartitionMetadata(serializedPartition.getKey(), 
serializedPartition.getLength());
         fileOutputStream.write(serializedPartition.getData(), 0, 
serializedPartition.getLength());
       }
     }
@@ -187,7 +186,7 @@ public void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> par
                   new LimitedInputStream(fileStream, 
partitionMetadata.getPartitionSize());
               final NonSerializedPartition<K> deserializePartition =
                   DataUtil.deserializePartition(
-                      partitionMetadata.getElementsTotal(), serializer, key, 
limitedInputStream);
+                      partitionMetadata.getPartitionSize(), serializer, key, 
limitedInputStream);
               deserializedPartitions.add(deserializePartition);
               // rearrange file pointer
               final long toSkip = partitionMetadata.getPartitionSize() - 
availableBefore + fileStream.available();
@@ -237,7 +236,7 @@ public void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> par
                 throw new IOException("The read data size does not match with 
the partition size.");
               }
               partitionsInRange.add(new SerializedPartition<>(
-                  key, partitionmetadata.getElementsTotal(), serializedData, 
serializedData.length));
+                  key, serializedData, serializedData.length));
             } else {
               // Have to skip this partition.
               skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index 3eafe8ebb..108a4e79b 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -58,18 +58,16 @@ public FileMetadata(final List<PartitionMetadata<K>> 
partitionMetadataList) {
    *
    * @param key     the key of the partition.
    * @param partitionSize the size of the partition.
-   * @param elementsTotal the number of elements in the partition.
    * @throws IOException if fail to append the partition metadata.
    */
   public final synchronized void writePartitionMetadata(final K key,
-                                                        final int 
partitionSize,
-                                                        final long 
elementsTotal) throws IOException {
+                                                        final int 
partitionSize) throws IOException {
     if (committed.get()) {
       throw new IOException("Cannot write a new block to a closed partition.");
     }
 
     final PartitionMetadata partitionMetadata =
-        new PartitionMetadata(key, partitionSize, writtenBytesCursor, 
elementsTotal);
+        new PartitionMetadata(key, partitionSize, writtenBytesCursor);
     partitionMetadataList.add(partitionMetadata);
     writtenBytesCursor += partitionSize;
   }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 9d68e5a39..2fa9de22d 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -25,7 +25,6 @@
   private final K key;
   private final int partitionSize;
   private final long offset;
-  private final long elementsTotal;
 
   /**
    * Constructor.
@@ -33,16 +32,13 @@
    * @param key           the key of this partition.
    * @param partitionSize the size of this partition.
    * @param offset        the offset of this partition.
-   * @param elementsTotal the total number of elements in this partition.
    */
   public PartitionMetadata(final K key,
                            final int partitionSize,
-                           final long offset,
-                           final long elementsTotal) {
+                           final long offset) {
     this.key = key;
     this.partitionSize = partitionSize;
     this.offset = offset;
-    this.elementsTotal = elementsTotal;
   }
 
   /**
@@ -66,13 +62,6 @@ public long getOffset() {
     return offset;
   }
 
-  /**
-   * @return the total number of elements in this partition.
-   */
-  public long getElementsTotal() {
-    return elementsTotal;
-  }
-
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
@@ -82,8 +71,6 @@ public String toString() {
     sb.append(partitionSize);
     sb.append("/ offset: ");
     sb.append(offset);
-    sb.append("/ elementsTotal: ");
-    sb.append(elementsTotal);
     return sb.toString();
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 9a22e438d..8ad212635 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -83,7 +83,6 @@ public synchronized void commitBlock() throws IOException {
         dataOutputStream.write(key);
         dataOutputStream.writeInt(partitionMetadata.getPartitionSize());
         dataOutputStream.writeLong(partitionMetadata.getOffset());
-        dataOutputStream.writeLong(partitionMetadata.getElementsTotal());
       }
     }
     setCommitted(true);
@@ -127,7 +126,6 @@ public synchronized void commitBlock() throws IOException {
         final PartitionMetadata<T> partitionMetadata = new PartitionMetadata<>(
             SerializationUtils.deserialize(desKey),
             dataInputStream.readInt(),
-            dataInputStream.readLong(),
             dataInputStream.readLong()
         );
         partitionMetadataList.add(partitionMetadata);
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index d6d63d1e2..40150002e 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -32,7 +32,6 @@
  */
 public final class SerializedPartition<K> implements Partition<byte[], K> {
   private final K key;
-  private volatile long elementsCount;
   private volatile byte[] serializedData;
   private volatile int length;
   private volatile boolean committed;
@@ -52,7 +51,6 @@
   public SerializedPartition(final K key,
                              final Serializer serializer) throws IOException {
     this.key = key;
-    this.elementsCount = 0;
     this.serializedData = new byte[0];
     this.length = 0;
     this.committed = false;
@@ -66,16 +64,13 @@ public SerializedPartition(final K key,
    * Data cannot be written to this partition after the construction.
    *
    * @param key            the key.
-   * @param elementsTotal  the total number of elements.
    * @param serializedData the serialized data.
    * @param length         the length of the actual serialized data. (It can 
be different with serializedData.length)
    */
   public SerializedPartition(final K key,
-                             final long elementsTotal,
                              final byte[] serializedData,
                              final int length) {
     this.key = key;
-    this.elementsCount = elementsTotal;
     this.serializedData = serializedData;
     this.length = length;
     this.committed = true;
@@ -97,7 +92,6 @@ public void write(final Object element) throws IOException {
     } else {
       try {
         encoder.encode(element);
-        elementsCount++;
       } catch (final IOException e) {
         wrappedStream.close();
       }
@@ -160,16 +154,4 @@ public int getLength() throws IOException {
       return length;
     }
   }
-
-  /**
-   * @return the number of elements.
-   * @throws IOException if the partition is not committed yet.
-   */
-  public long getElementsCount() throws IOException {
-    if (!committed) {
-      throw new IOException("The partition is not committed yet!");
-    } else {
-      return elementsCount;
-    }
-  }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 7a4068ebc..36db3ec1c 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -32,8 +32,8 @@
   private final int hashRangeMultiplier;
 
   @Inject
-  public DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) 
final int hashRangeMultiplier,
-                             final BlockManagerWorker blockManagerWorker) {
+  private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) 
final int hashRangeMultiplier,
+                              final BlockManagerWorker blockManagerWorker) {
     this.hashRangeMultiplier = hashRangeMultiplier;
     this.blockManagerWorker = blockManagerWorker;
   }
@@ -41,7 +41,7 @@ public 
DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final i
   /**
    * Creates an {@link OutputWriter} between two stages.
    *
-   * @param srcIRVertex     the {@link IRVertex} that outputs the data to be 
written.
+   * @param srcIRVertex the {@link IRVertex} that outputs the data to be 
written.
    * @param srcTaskIdx  the index of the source task.
    * @param dstIRVertex the {@link IRVertex} that will take the output data as 
its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to 
dstIRVertex.
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index bb594f622..1744bb273 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -53,12 +53,12 @@
    * @param runtimeEdge         the {@link RuntimeEdge}.
    * @param blockManagerWorker  the {@link BlockManagerWorker}.
    */
-  public OutputWriter(final int hashRangeMultiplier,
-                      final int srcTaskIdx,
-                      final String srcRuntimeVertexId,
-                      final IRVertex dstIrVertex,
-                      final RuntimeEdge<?> runtimeEdge,
-                      final BlockManagerWorker blockManagerWorker) {
+  OutputWriter(final int hashRangeMultiplier,
+               final int srcTaskIdx,
+               final String srcRuntimeVertexId,
+               final IRVertex dstIrVertex,
+               final RuntimeEdge<?> runtimeEdge,
+               final BlockManagerWorker blockManagerWorker) {
     super(runtimeEdge.getId());
     this.blockId = RuntimeIdGenerator.generateBlockId(getId(), srcTaskIdx);
     this.runtimeEdge = runtimeEdge;
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index df9454a9d..f80168519 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -91,16 +91,21 @@
     SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final InterTaskDataStoreProperty.Value MEMORY_STORE = 
InterTaskDataStoreProperty.Value.MemoryStore;
-  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = 
InterTaskDataStoreProperty.Value.SerializedMemoryStore;
-  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = 
InterTaskDataStoreProperty.Value.LocalFileStore;
-  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = 
InterTaskDataStoreProperty.Value.GlusterFileStore;
+  private static final InterTaskDataStoreProperty.Value MEMORY_STORE =
+      InterTaskDataStoreProperty.Value.MemoryStore;
+  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE =
+      InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE =
+      InterTaskDataStoreProperty.Value.LocalFileStore;
+  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE =
+      InterTaskDataStoreProperty.Value.GlusterFileStore;
   private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
   private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
   private static final int PARALLELISM_TEN = 10;
   private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
   private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
-  private static final EncoderFactory ENCODER_FACTORY = 
PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+  private static final EncoderFactory ENCODER_FACTORY =
+      PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
   private static final DecoderFactory DECODER_FACTORY =
       PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
   private static final Tang TANG = Tang.Factory.getTang();
@@ -108,6 +113,7 @@
 
   private BlockManagerMaster master;
   private BlockManagerWorker worker1;
+  private DataTransferFactory transferFactory;
   private BlockManagerWorker worker2;
   private HashMap<BlockManagerWorker, SerializerManager> serializerManagers = 
new HashMap<>();
 
@@ -152,10 +158,12 @@ public void setUp() throws InjectionException {
     injector2.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
 
     this.master = master;
-    this.worker1 = createWorker(EXECUTOR_ID_PREFIX + 
executorCount.getAndIncrement(), messageDispatcher,
-        injector2);
+    final Pair<BlockManagerWorker, DataTransferFactory> pair1 =
+        createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), 
messageDispatcher, injector2);
+    this.worker1 = pair1.left();
+    this.transferFactory = pair1.right();
     this.worker2 = createWorker(EXECUTOR_ID_PREFIX + 
executorCount.getAndIncrement(), messageDispatcher,
-        injector2);
+        injector2).left();
   }
 
   @After
@@ -164,8 +172,10 @@ public void tearDown() throws IOException {
     FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
   }
 
-  private BlockManagerWorker createWorker(final String executorId, final 
LocalMessageDispatcher messageDispatcher,
-                                          final Injector nameClientInjector) {
+  private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+      final String executorId,
+      final LocalMessageDispatcher messageDispatcher,
+      final Injector nameClientInjector) {
     final LocalMessageEnvironment messageEnvironment = new 
LocalMessageEnvironment(executorId, messageDispatcher);
     final PersistentConnectionToMasterMap conToMaster = new 
PersistentConnectionToMasterMap(messageEnvironment);
     final Configuration executorConfiguration = TANG.newConfigurationBuilder()
@@ -180,11 +190,13 @@ private BlockManagerWorker createWorker(final String 
executorId, final LocalMess
     final BlockManagerWorker blockManagerWorker;
     final MetricManagerWorker metricManagerWorker;
     final SerializerManager serializerManager;
+    final DataTransferFactory dataTransferFactory;
     try {
       blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
-      metricManagerWorker =  injector.getInstance(MetricManagerWorker.class);
+      metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
       serializerManager = injector.getInstance(SerializerManager.class);
       serializerManagers.put(blockManagerWorker, serializerManager);
+      dataTransferFactory = injector.getInstance(DataTransferFactory.class);
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
@@ -195,11 +207,11 @@ private BlockManagerWorker createWorker(final String 
executorId, final LocalMess
         conToMaster,
         messageEnvironment,
         serializerManager,
-        new DataTransferFactory(HASH_RANGE_MULTIPLIER, blockManagerWorker),
+        dataTransferFactory,
         metricManagerWorker);
     injector.bindVolatileInstance(Executor.class, executor);
 
-    return blockManagerWorker;
+    return Pair.of(blockManagerWorker, dataTransferFactory);
   }
 
   private Injector createNameClientInjector() {
@@ -336,8 +348,7 @@ private void writeAndRead(final BlockManagerWorker sender,
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, 
srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge, sender);
+      final OutputWriter writer = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
@@ -432,14 +443,12 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, 
srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge, sender);
+      final OutputWriter writer = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
 
-      final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, 
srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge2, sender);
+      final OutputWriter writer2 = transferFactory.createWriter(srcVertex, 
srcTaskIndex, dstVertex, dummyEdge2);
       dataWritten.iterator().forEachRemaining(writer2::write);
       writer2.close();
     });
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index 5d24f8127..ab1beb7b7 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,9 +63,8 @@ public void testSailfish() {
                 
edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
             
assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
                 
edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-            // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-            //assertEquals(BytesDecoderFactory.of(),
-            //    edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            assertEquals(BytesDecoderFactory.of(),
+                edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
             assertEquals(DataFlowModelProperty.Value.Pull,
                 
edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -78,9 +77,8 @@ public void testSailfish() {
               
edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
           assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
               
edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-          // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-          //assertEquals(BytesEncoderFactory.of(),
-          //    edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          assertEquals(BytesEncoderFactory.of(),
+              edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
       } else {
         // Non merger vertex.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to