johnyangk closed pull request #153: [NEMO-245,247] Handle watermark in 
OutputWriter and Implement unbounded word count example
URL: https://github.com/apache/incubator-nemo/pull/153
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 279f7c57f..1bb8185e8 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -19,7 +19,10 @@
 package org.apache.nemo.common.coder;
 
 import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -27,6 +30,7 @@
  * A {@link DecoderFactory} which is used for an array of bytes.
  */
 public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BytesDecoderFactory.class.getName());
 
   private static final BytesDecoderFactory BYTES_DECODER_FACTORY = new 
BytesDecoderFactory();
 
@@ -84,7 +88,7 @@ private BytesDecoder(final InputStream inputStream) {
           returnedArray = true;
           return new byte[0];
         } else {
-          throw new IOException("EoF (empty partition)!"); // TODO #120: use 
EOF exception instead of IOException.
+          throw new EOFException("EoF (empty partition)!"); // TODO #120: use 
EOF exception instead of IOException.
         }
       }
       final byte[] resultBytes = new byte[lengthToRead]; // Read the size of 
this byte array.
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
index 140f5ea24..3a5af266b 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
@@ -18,12 +18,16 @@
  */
 package org.apache.nemo.common.coder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.*;
 
 /**
  * A {@link EncoderFactory} which is used for an array of bytes.
  */
 public final class BytesEncoderFactory implements EncoderFactory<byte[]> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BytesEncoderFactory.class.getName());
 
   private static final BytesEncoderFactory BYTES_ENCODER_FACTORY = new 
BytesEncoderFactory();
 
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
 
b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
index bf6e90889..32406c9af 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
@@ -23,6 +23,7 @@
 
 /**
  * Decoder ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
  */
 public final class DecoderProperty extends 
EdgeExecutionProperty<DecoderFactory> {
   /**
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
 
b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
index cf931fc32..8e6385d78 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
@@ -23,6 +23,7 @@
 
 /**
  * EncoderFactory ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
  */
 public final class EncoderProperty extends 
EdgeExecutionProperty<EncoderFactory> {
   /**
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b0dbe543d..cd713d383 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -20,6 +20,8 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
@@ -28,6 +30,7 @@
  */
 public final class RelayTransform<T> implements Transform<T, T> {
   private OutputCollector<T> outputCollector;
+  private static final Logger LOG = 
LoggerFactory.getLogger(RelayTransform.class.getName());
 
   /**
    * Default constructor.
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
index 4cfefe8dd..c1ff6f0d7 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -119,6 +119,11 @@ private BeamDecoder(final InputStream inputStream,
     public T2 decode() throws IOException {
       return decodeInternal();
     }
+
+    @Override
+    public String toString() {
+      return "BeamDecoder: {" + beamCoder.toString() + "}";
+    }
   }
 
   /**
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
index e46f5b00b..090c24b3b 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -22,6 +22,8 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,6 +33,7 @@
  * @param <T> the type of element to encode.
  */
 public final class BeamEncoderFactory<T> implements EncoderFactory<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamEncoderFactory.class.getName());
 
   private final Coder<T> beamCoder;
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 6a8f8d49a..dd5ca35be 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -65,9 +65,9 @@
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
 
-  // For bundle
-  // we consider count and time millis for start/finish bundle
-  // if # of processed elements > bundleSize
+  // Variables for bundle.
+  // We consider count and time millis for start/finish bundle.
+  // If # of processed elements > bundleSize
   // or elapsed time > bundleMillis, we finish the current bundle and start a 
new one
   private transient long bundleSize;
   private transient long bundleMillis;
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 18368c601..9f7a4e0f7 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -27,6 +27,8 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.List;
@@ -39,6 +41,7 @@
  * @param <OutputT> output type.
  */
 public final class DoFnTransform<InputT, OutputT> extends 
AbstractDoFnTransform<InputT, InputT, OutputT> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnTransform.class.getName());
 
   /**
    * DoFnTransform Constructor.
diff --git 
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index 84992252a..c34615e70 100644
--- 
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -53,11 +53,11 @@ public LargeShuffleRelayReshapingPass() {
       // We care about OperatorVertices that have any incoming edge that
       // has Shuffle as data communication pattern.
       if (v instanceof OperatorVertex && 
dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
-              CommunicationPatternProperty.Value.Shuffle
+        CommunicationPatternProperty.Value.Shuffle
           
.equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           if (CommunicationPatternProperty.Value.Shuffle
-                
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+            
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
             // Insert a merger vertex having transform that write received 
data immediately
             // before the vertex receiving shuffled data.
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new 
RelayTransform());
@@ -67,7 +67,7 @@ public LargeShuffleRelayReshapingPass() {
               new IREdge(CommunicationPatternProperty.Value.Shuffle, 
edge.getSrc(), iFileMergerVertex);
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
             final IREdge newEdgeFromMerger = new 
IREdge(CommunicationPatternProperty.Value.OneToOne,
-                iFileMergerVertex, v);
+              iFileMergerVertex, v);
             
newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
             
newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newEdgeToMerger);
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index e3fa23eaa..f9a44ec96 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -26,11 +26,9 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.reef.io.Tuple;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 535376392..d7f8c85c8 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
@@ -26,6 +27,7 @@
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.joda.time.Duration;
@@ -41,19 +43,65 @@
   private WindowedWordCount() {
   }
 
+  public static final String INPUT_TYPE_BOUNDED = "bounded";
+  public static final String INPUT_TYPE_UNBOUNDED = "unbounded";
+  private static final String SPLITTER = "!";
+
+
+  private static PCollection<KV<String, Long>> getSource(
+    final Pipeline p,
+    final String[] args) {
+
+    final String inputType = args[2];
+    if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) {
+      final String inputFilePath = args[3];
+      return GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@Element final String elem,
+                                     final OutputReceiver<String> out) {
+            final String[] splitt = elem.split(SPLITTER);
+            out.outputWithTimestamp(splitt[0], new 
Instant(Long.valueOf(splitt[1])));
+          }
+        }))
+        .apply(MapElements.<String, KV<String, Long>>via(new 
SimpleFunction<String, KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final String line) {
+            final String[] words = line.split(" +");
+            final String documentId = words[0] + "#" + words[1];
+            final Long count = Long.parseLong(words[2]);
+            return KV.of(documentId, count);
+          }
+        }));
+    } else if (inputType.compareTo(INPUT_TYPE_UNBOUNDED) == 0) {
+      // unbounded
+      return p.apply(GenerateSequence
+        .from(1)
+        .withRate(2, Duration.standardSeconds(1))
+        .withTimestampFn(num -> new Instant(num * 500)))
+        .apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final Long val) {
+            return KV.of(String.valueOf(val % 2), 1L);
+          }
+        }));
+    } else {
+      throw new RuntimeException("Unsupported input type: " + inputType);
+    }
+  }
   /**
    * Main function for the MR BEAM program.
    * @param args arguments.
    */
   public static void main(final String[] args) {
-    final String inputFilePath = args[0];
-    final String outputFilePath = args[1];
-    final String windowType = args[2];
-    final Window<String> windowFn;
+    final String outputFilePath = args[0];
+    final String windowType = args[1];
+
+    final Window<KV<String, Long>> windowFn;
     if (windowType.equals("fixed")) {
-      windowFn = 
Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+      windowFn = Window.<KV<String, 
Long>>into(FixedWindows.of(Duration.standardSeconds(5)));
     } else {
-      windowFn = 
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+      windowFn = Window.<KV<String, 
Long>>into(SlidingWindows.of(Duration.standardSeconds(10))
         .every(Duration.standardSeconds(5)));
     }
 
@@ -62,33 +110,18 @@ public static void main(final String[] args) {
     options.setJobName("WindowedWordCount");
 
     final Pipeline p = Pipeline.create(options);
-    GenericSourceSink.read(p, inputFilePath)
-        .apply(ParDo.of(new DoFn<String, String>() {
-            @ProcessElement
-            public void processElement(@Element final String elem,
-                                       final OutputReceiver<String> out) {
-              final String[] splitt = elem.split("!");
-              out.outputWithTimestamp(splitt[0], new 
Instant(Long.valueOf(splitt[1])));
-            }
-        }))
-        .apply(windowFn)
-        .apply(MapElements.<String, KV<String, Long>>via(new 
SimpleFunction<String, KV<String, Long>>() {
-          @Override
-          public KV<String, Long> apply(final String line) {
-            final String[] words = line.split(" +");
-            final String documentId = words[0] + "#" + words[1];
-            final Long count = Long.parseLong(words[2]);
-            return KV.of(documentId, count);
-          }
-        }))
-        .apply(Sum.longsPerKey())
-        .apply(MapElements.<KV<String, Long>, String>via(new 
SimpleFunction<KV<String, Long>, String>() {
-          @Override
-          public String apply(final KV<String, Long> kv) {
-            return kv.getKey() + ": " + kv.getValue();
-          }
-        }))
-        .apply(new WriteOneFilePerWindow(outputFilePath, null));
+
+    getSource(p, args)
+      .apply(windowFn)
+      .apply(Sum.longsPerKey())
+      .apply(MapElements.<KV<String, Long>, String>via(new 
SimpleFunction<KV<String, Long>, String>() {
+        @Override
+        public String apply(final KV<String, Long> kv) {
+          return kv.getKey() + ": " + kv.getValue();
+        }
+      }))
+      .apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
     p.run();
   }
 }
diff --git 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 55ed19dcb..c0134aa33 100644
--- 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -28,6 +28,9 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static 
org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_BOUNDED;
+import static 
org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_UNBOUNDED;
+
 /**
  * Test Windowed word count program with JobLauncher.
  */
@@ -51,7 +54,7 @@
   public void testBatchFixedWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+      .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
@@ -71,7 +74,7 @@ public void testBatchFixedWindow() throws Exception {
   public void testBatchSlidingWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, 
inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -91,7 +94,7 @@ public void testStreamingSchedulerAndPipeFixedWindow() throws 
Exception {
     builder = new ArgBuilder()
       
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+      .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -112,7 +115,7 @@ public void testStreamingSchedulerAndPipeSlidingWindow() 
throws Exception {
     builder = new ArgBuilder()
       
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, 
inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -126,4 +129,25 @@ public void testStreamingSchedulerAndPipeSlidingWindow() 
throws Exception {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
   }
+
+
+  // TODO #271: We currently disable this test because we cannot force close 
Nemo
+  //@Test (timeout = TIMEOUT)
+  public void testUnboundedSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_UNBOUNDED);
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
expectedSlidingWindowOutputFileName);
+    } finally {
+    }
+  }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index b218bf886..f7aa184a8 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -19,6 +19,10 @@
 package org.apache.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
+import org.apache.nemo.common.coder.BytesDecoderFactory;
+import org.apache.nemo.common.coder.BytesEncoderFactory;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
@@ -39,6 +43,8 @@
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventDecoderFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventEncoderFactory;
 import org.apache.nemo.runtime.executor.task.TaskExecutor;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -114,6 +120,7 @@ private synchronized void onTaskReceived(final Task task) {
    * @param task to launch.
    */
   private void launchTask(final Task task) {
+    LOG.info("Launch task: {}", task);
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
           SerializationUtils.deserialize(task.getSerializedIRDag());
@@ -121,19 +128,19 @@ private void launchTask(final Task task) {
           new TaskStateManager(task, executorId, 
persistentConnectionToMasterMap, metricMessageSender);
 
       task.getTaskIncomingEdges().forEach(e -> 
serializerManager.register(e.getId(),
-          e.getPropertyValue(EncoderProperty.class).get(),
-          e.getPropertyValue(DecoderProperty.class).get(),
+          getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+          getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
           e.getPropertyValue(CompressionProperty.class).orElse(null),
           e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       task.getTaskOutgoingEdges().forEach(e -> 
serializerManager.register(e.getId(),
-          e.getPropertyValue(EncoderProperty.class).get(),
-          e.getPropertyValue(DecoderProperty.class).get(),
+          getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+          getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
           e.getPropertyValue(CompressionProperty.class).orElse(null),
           e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v).forEach(e -> 
serializerManager.register(e.getId(),
-            e.getPropertyValue(EncoderProperty.class).get(),
-            e.getPropertyValue(DecoderProperty.class).get(),
+            getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+            getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
             e.getPropertyValue(CompressionProperty.class).orElse(null),
             e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
@@ -155,6 +162,36 @@ private void launchTask(final Task task) {
     }
   }
 
+  /**
+   * This wraps the encoder with NemoEventEncoder.
+   * If the encoder is BytesEncoderFactory, we do not wrap the encoder.
+   * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+   * @param encoderFactory encoder factory
+   * @return wrapped encoder
+   */
+  private EncoderFactory getEncoderFactory(final EncoderFactory 
encoderFactory) {
+    if (encoderFactory instanceof BytesEncoderFactory) {
+      return encoderFactory;
+    } else {
+      return new NemoEventEncoderFactory(encoderFactory);
+    }
+  }
+
+  /**
+   * This wraps the encoder with NemoEventDecoder.
+   * If the decoder is BytesDecoderFactory, we do not wrap the decoder.
+   * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+   * @param decoderFactory decoder factory
+   * @return wrapped decoder
+   */
+  private DecoderFactory getDecoderFactory(final DecoderFactory 
decoderFactory) {
+    if (decoderFactory instanceof BytesDecoderFactory) {
+      return decoderFactory;
+    } else {
+      return new NemoEventDecoderFactory(decoderFactory);
+    }
+  }
+
   public void terminate() {
     try {
       metricMessageSender.close();
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 51cd7e117..315760c99 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -22,6 +22,8 @@
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -37,6 +39,7 @@
  * although the execution order may not be linearized if they were called from 
different threads.</p>
  */
 public final class ByteOutputContext extends ByteTransferContext implements 
AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ByteOutputContext.class.getName());
 
   private final Channel channel;
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index 9cb7b237f..80e83df4e 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -144,6 +144,8 @@ private static void serializePartition(final EncoderFactory 
encoderFactory,
     final List<NonSerializedPartition<K>> nonSerializedPartitions = new 
ArrayList<>();
     for (final SerializedPartition<K> partitionToConvert : 
partitionsToConvert) {
       final K key = partitionToConvert.getKey();
+
+
       try (final ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = 
deserializePartition(
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index 952fbf457..71541c2b2 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -116,6 +116,7 @@ public void commit() throws IOException {
       // inner buffer directly, which can be an unfinished(not flushed) buffer.
       wrappedStream.close();
       this.serializedData = bytesOutputStream.getBufDirectly();
+
       this.length = bytesOutputStream.getCount();
       this.committed = true;
     }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 239153381..4b85087fd 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -21,11 +21,14 @@
 import org.apache.nemo.common.ir.edge.executionproperty.*;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.block.Block;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Optional;
@@ -34,6 +37,8 @@
  * Represents the output data transfer from a task.
  */
 public final class BlockOutputWriter implements OutputWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlockOutputWriter.class.getName());
+
   private final RuntimeEdge<?> runtimeEdge;
   private final IRVertex dstIrVertex;
   private final Partitioner partitioner;
@@ -88,6 +93,11 @@ public void write(final Object element) {
     } // If else, does not need to write because the data is duplicated.
   }
 
+  @Override
+  public void writeWatermark(final Watermark watermark) {
+    // do nothing
+  }
+
   /**
    * Notifies that all writes for a block is end.
    * Further write about a committed block will throw an exception.
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 56c754038..d50ad82fc 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -31,13 +31,19 @@
 public final class DataFetcherOutputCollector<O> implements OutputCollector<O> 
{
   private static final Logger LOG = 
LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
   private final OperatorVertex nextOperatorVertex;
+  private final int edgeIndex;
+  private final InputWatermarkManager watermarkManager;
 
   /**
    * It forwards output to the next operator.
    * @param nextOperatorVertex next operator to emit data and watermark
    */
-  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex,
+                                    final int edgeIndex,
+                                    final InputWatermarkManager 
watermarkManager) {
     this.nextOperatorVertex = nextOperatorVertex;
+    this.edgeIndex = edgeIndex;
+    this.watermarkManager = watermarkManager;
   }
 
   @Override
@@ -47,7 +53,7 @@ public void emit(final O output) {
 
   @Override
   public void emitWatermark(final Watermark watermark) {
-    nextOperatorVertex.getTransform().onWatermark(watermark);
+    watermarkManager.trackAndEmitWatermarks(edgeIndex, watermark);
   }
 
   @Override
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
index 66fb7aa81..adbb6596c 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -31,6 +31,8 @@
 
   /**
    * This tracks the minimum input watermark among multiple input streams.
+   * This method is not a Thread-safe so the caller should synchronize it
+   * if multiple threads access this method concurrently.
    * Ex)
    * -- input stream1 (edge 1):  ---------- ts: 3 ------------------ts: 6
    *                                                                 ^^^
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 91c7c55c9..613eccc06 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -28,14 +30,16 @@
  * This tracks the minimum input watermark among multiple input streams.
  */
 public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiInputWatermarkManager.class.getName());
+
   private final List<Watermark> watermarks;
-  private final OperatorVertex nextOperator;
+  private final OutputCollector<?> watermarkCollector;
   private int minWatermarkIndex;
   public MultiInputWatermarkManager(final int numEdges,
-                                    final OperatorVertex nextOperator) {
+                                    final OutputCollector<?> 
watermarkCollector) {
     super();
     this.watermarks = new ArrayList<>(numEdges);
-    this.nextOperator = nextOperator;
+    this.watermarkCollector = watermarkCollector;
     this.minWatermarkIndex = 0;
     // We initialize watermarks as min value because
     // we should not emit watermark until all edges emit watermarks.
@@ -58,6 +62,12 @@ private int findNextMinWatermarkIndex() {
 
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Track watermark {} emitted from edge {}:, {}", 
watermark.getTimestamp(), edgeIndex,
+        watermarks.toString());
+    }
+
     if (edgeIndex == minWatermarkIndex) {
       // update min watermark
       final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
@@ -74,7 +84,10 @@ public void trackAndEmitWatermarks(final int edgeIndex, 
final Watermark watermar
       if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
         // Watermark timestamp progress!
         // Emit the min watermark
-        nextOperator.getTransform().onWatermark(minWatermark);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Emit watermark {}, {}", minWatermark, watermarks);
+        }
+        watermarkCollector.emitWatermark(minWatermark);
       }
     } else {
       // The recent watermark timestamp cannot be less than the previous one
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
new file mode 100644
index 000000000..1367e7084
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A factory for NemoEventDecoder.
+ */
+public final class NemoEventDecoderFactory implements DecoderFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NemoEventDecoderFactory.class.getName());
+
+  private final DecoderFactory valueDecoderFactory;
+
+  public NemoEventDecoderFactory(final DecoderFactory valueDecoderFactory) {
+    this.valueDecoderFactory = valueDecoderFactory;
+  }
+
+  @Override
+  public Decoder create(final InputStream inputStream) throws IOException {
+    return new NemoEventDecoder(valueDecoderFactory.create(inputStream), 
inputStream);
+  }
+
+  /**
+   * This class decodes receive data into two types.
+   * - normal data
+   * - WatermarkWithIndex
+   */
+  private final class NemoEventDecoder implements DecoderFactory.Decoder {
+
+    private final Decoder valueDecoder;
+    private final InputStream inputStream;
+
+    NemoEventDecoder(final Decoder valueDecoder,
+                     final InputStream inputStream) {
+      this.valueDecoder = valueDecoder;
+      this.inputStream = inputStream;
+    }
+
+    @Override
+    public Object decode() throws IOException {
+
+      final byte isWatermark = (byte) inputStream.read();
+      if (isWatermark == -1) {
+        // end of the input stream
+        throw new EOFException();
+      }
+
+      if (isWatermark == 0x00) {
+        // this is not a watermark
+        return valueDecoder.decode();
+      } else if (isWatermark == 0x01) {
+        // this is a watermark
+        final WatermarkWithIndex watermarkWithIndex =
+          (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
+        return watermarkWithIndex;
+      } else {
+        throw new RuntimeException("Watermark decoding failure: " + 
isWatermark);
+      }
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder stringBuilder = new StringBuilder("NemoDecoder{");
+      stringBuilder.append(valueDecoder.toString());
+      stringBuilder.append("}");
+      return stringBuilder.toString();
+    }
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
new file mode 100644
index 000000000..c49beda80
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A factory for NemoEventEncoder.
+ */
+public final class NemoEventEncoderFactory implements EncoderFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NemoEventEncoderFactory.class.getName());
+
+  private final EncoderFactory valueEncoderFactory;
+
+  public NemoEventEncoderFactory(final EncoderFactory valueEncoderFactory) {
+    this.valueEncoderFactory = valueEncoderFactory;
+  }
+
+  @Override
+  public Encoder create(final OutputStream outputStream) throws IOException {
+    return new NemoEventEncoder(valueEncoderFactory.create(outputStream), 
outputStream);
+  }
+
+  /**
+   * This encodes normal data and WatermarkWithIndex.
+   * @param <T>
+   */
+  private final class NemoEventEncoder<T> implements EncoderFactory.Encoder<T> 
{
+    private final EncoderFactory.Encoder<T> valueEncoder;
+    private final OutputStream outputStream;
+
+    NemoEventEncoder(final EncoderFactory.Encoder<T> valueEncoder,
+                     final OutputStream outputStream) {
+      this.valueEncoder = valueEncoder;
+      this.outputStream = outputStream;
+    }
+
+    @Override
+    public void encode(final T element) throws IOException {
+      if (element instanceof WatermarkWithIndex) {
+        outputStream.write(0x01); // this is watermark
+        outputStream.write(SerializationUtils.serialize((Serializable) 
element));
+      } else {
+        outputStream.write(0x00); // this is a data element
+        valueEncoder.encode(element);
+      }
+    }
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 3637780c1..12d993261 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,6 +77,7 @@ private void emit(final OutputWriter writer, final O output) {
 
   @Override
   public void emit(final O output) {
+
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) 
{
       emit(internalVertex.getNextOperator(), output);
     }
@@ -104,6 +105,11 @@ public void emit(final O output) {
 
   @Override
   public void emitWatermark(final Watermark watermark) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
+    }
+
     // Emit watermarks to internal vertices
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) 
{
       
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(),
 watermark);
@@ -115,7 +121,15 @@ public void emitWatermark(final Watermark watermark) {
       }
     }
 
-    // TODO #245: handle watermarks in OutputWriter
-    // TODO #245: currently ignore emitting watermarks to output writer
+    // Emit watermarks to output writer
+    for (final OutputWriter outputWriter : externalMainOutputs) {
+      outputWriter.writeWatermark(watermark);
+    }
+
+    for (final List<OutputWriter> externalVertices : 
externalAdditionalOutputs.values()) {
+      for (final OutputWriter externalVertex : externalVertices) {
+        externalVertex.writeWatermark(watermark);
+      }
+    }
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
new file mode 100644
index 000000000..66efb7239
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * This class is used for collecting watermarks for an OperatorVertex.
+ * InputWatermarkManager emits watermarks to this class.
+ */
+public final class OperatorWatermarkCollector implements OutputCollector {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OperatorWatermarkCollector.class.getName());
+
+  private final OperatorVertex operatorVertex;
+
+  public OperatorWatermarkCollector(final OperatorVertex operatorVertex) {
+    this.operatorVertex = operatorVertex;
+  }
+
+  @Override
+  public void emit(final Object output) {
+    throw new IllegalStateException("Should not be called");
+  }
+
+  @Override
+  public void emitWatermark(final Watermark watermark) {
+    operatorVertex.getTransform().onWatermark(watermark);
+  }
+
+  @Override
+  public void emit(final String dstVertexId, final Object output) {
+    throw new IllegalStateException("Should not be called");
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 032510a1a..301c95a94 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -23,6 +23,7 @@
 import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
@@ -40,6 +41,12 @@
    */
   void write(final Object element);
 
+  /**
+   * Writes watermarks to all edges.
+   * @param watermark watermark
+   */
+  void writeWatermark(final Watermark watermark);
+
   /**
    * @return the total written bytes.
    */
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index a5dbf934b..dd7039416 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -21,6 +21,7 @@
 import org.apache.nemo.common.DirectByteArrayOutputStream;
 import org.apache.nemo.common.coder.EncoderFactory;
 import 
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
@@ -33,6 +34,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -43,6 +45,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(OutputWriter.class.getName());
 
   private final String srcTaskId;
+  private final int srcTaskIndex;
   private final PipeManagerWorker pipeManagerWorker;
 
   private final Partitioner partitioner;
@@ -70,6 +73,27 @@
     this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), 
RuntimeIdManager.getIndexFromTaskId(srcTaskId));
     this.partitioner = OutputWriter.getPartitioner(runtimeEdge, 
hashRangeMultiplier);
     this.runtimeEdge = runtimeEdge;
+    this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(srcTaskId);
+  }
+
+  private void writeData(final Object element, final List<ByteOutputContext> 
pipeList) {
+    pipeList.forEach(pipe -> {
+
+      try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = 
pipe.newOutputStream()) {
+        // Serialize (Do not compress)
+        final DirectByteArrayOutputStream bytesOutputStream = new 
DirectByteArrayOutputStream();
+        final OutputStream wrapped =
+          DataUtil.buildOutputStream(bytesOutputStream, 
serializer.getEncodeStreamChainers());
+        final EncoderFactory.Encoder encoder = 
serializer.getEncoderFactory().create(wrapped);
+        encoder.encode(element);
+        wrapped.close();
+
+        // Write
+        pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+      } catch (IOException e) {
+        throw new RuntimeException(e); // For now we crash the executor on 
IOException
+      }
+    });
   }
 
   /**
@@ -82,19 +106,17 @@ public void write(final Object element) {
       doInitialize();
     }
 
-    try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = 
getPipeToWrite(element)) {
-      // Serialize (Do not compress)
-      final DirectByteArrayOutputStream bytesOutputStream = new 
DirectByteArrayOutputStream();
-      final OutputStream wrapped = 
DataUtil.buildOutputStream(bytesOutputStream, 
serializer.getEncodeStreamChainers());
-      final EncoderFactory.Encoder encoder = 
serializer.getEncoderFactory().create(wrapped);
-      encoder.encode(element);
-      wrapped.close();
-
-      // Write
-      pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
-    } catch (IOException e) {
-      throw new RuntimeException(e); // For now we crash the executor on 
IOException
+    writeData(element, getPipeToWrite(element));
+  }
+
+  @Override
+  public void writeWatermark(final Watermark watermark) {
+    if (!initialized) {
+      doInitialize();
     }
+
+    final WatermarkWithIndex watermarkWithIndex = new 
WatermarkWithIndex(watermark, srcTaskIndex);
+    writeData(watermarkWithIndex, pipes);
   }
 
   @Override
@@ -126,11 +148,11 @@ private void doInitialize() {
     this.serializer = pipeManagerWorker.getSerializer(runtimeEdge.getId());
   }
 
-  private ByteOutputContext.ByteOutputStream getPipeToWrite(final Object 
element) throws IOException {
+  private List<ByteOutputContext> getPipeToWrite(final Object element) {
     return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
       .get()
       .equals(CommunicationPatternProperty.Value.OneToOne)
-      ? pipes.get(0).newOutputStream()
-      : pipes.get((int) partitioner.partition(element)).newOutputStream();
+      ? Collections.singletonList(pipes.get(0))
+      : Collections.singletonList(pipes.get((int) 
partitioner.partition(element)));
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index 204bf22c2..e8135f9f0 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 
 
@@ -27,10 +27,10 @@
  */
 public final class SingleInputWatermarkManager implements 
InputWatermarkManager {
 
-  private final OperatorVertex nextOperator;
+  private final OutputCollector watermarkCollector;
 
-  public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
-    this.nextOperator = nextOperator;
+  public SingleInputWatermarkManager(final OutputCollector watermarkCollector) 
{
+    this.watermarkCollector = watermarkCollector;
   }
 
   /**
@@ -41,6 +41,6 @@ public SingleInputWatermarkManager(final OperatorVertex 
nextOperator) {
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex,
                                      final Watermark watermark) {
-    nextOperator.getTransform().onWatermark(watermark);
+    watermarkCollector.emitWatermark(watermark);
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
new file mode 100644
index 000000000..3db6cd5dc
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+import java.io.Serializable;
+
+/**
+ * This contains a watermark and the src task index.
+ * It is used for transferring the watermark between tasks.
+ */
+public final class WatermarkWithIndex implements Serializable {
+  private final Watermark watermark;
+  private final int index;
+
+  public WatermarkWithIndex(final Watermark watermark, final int index) {
+    this.watermark = watermark;
+    this.index = index;
+  }
+
+  public Watermark getWatermark() {
+    return watermark;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append(watermark);
+    sb.append(" from ");
+    sb.append(index);
+    return sb.toString();
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index a9b0da3ce..001060c7a 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -21,8 +21,9 @@
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,9 +57,12 @@
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  private int numOfIterators;
+  private int numOfIterators; // == numOfIncomingEdges
   private int numOfFinishMarks = 0;
 
+  // A watermark manager
+  private InputWatermarkManager inputWatermarkManager;
+
   MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
                                    final InputReader readerForParentTask,
                                    final OutputCollector outputCollector) {
@@ -96,6 +100,12 @@ private void fetchDataLazily() {
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = 
readersForParentTask.read();
     numOfIterators = futures.size();
 
+    if (numOfIterators > 1) {
+      inputWatermarkManager = new MultiInputWatermarkManager(numOfIterators, 
new WatermarkCollector());
+    } else {
+      inputWatermarkManager = new SingleInputWatermarkManager(new 
WatermarkCollector());
+    }
+
     futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
       // A thread for each iterator
       queueInsertionThreads.submit(() -> {
@@ -103,7 +113,21 @@ private void fetchDataLazily() {
           // Consume this iterator to the end.
           while (iterator.hasNext()) { // blocked on the iterator.
             final Object element = iterator.next();
-            elementQueue.offer(element);
+
+
+            if (element instanceof WatermarkWithIndex) {
+              // watermark element
+              // the input watermark manager is accessed by multiple threads
+              // so we should synchronize it
+              synchronized (inputWatermarkManager) {
+                final WatermarkWithIndex watermarkWithIndex = 
(WatermarkWithIndex) element;
+                inputWatermarkManager.trackAndEmitWatermarks(
+                  watermarkWithIndex.getIndex(), 
watermarkWithIndex.getWatermark());
+              }
+            } else {
+              // data element
+              elementQueue.offer(element);
+            }
           }
 
           // This iterator is finished.
@@ -147,4 +171,26 @@ private synchronized void countBytesSynchronized(final 
DataUtil.IteratorWithNumB
   public void close() throws Exception {
     queueInsertionThreads.shutdown();
   }
+
+  /**
+   * Just adds the emitted watermark to the element queue.
+   * It receives the watermark from InputWatermarkManager.
+   */
+  private final class WatermarkCollector implements OutputCollector {
+
+    @Override
+    public void emit(final Object output) {
+      throw new IllegalStateException("Should not be called");
+    }
+
+    @Override
+    public void emitWatermark(final Watermark watermark) {
+      elementQueue.offer(watermark);
+    }
+
+    @Override
+    public void emit(final String dstVertexId, final Object output) {
+      throw new IllegalStateException("Should not be called");
+    }
+  }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index fa4bd8a11..b42bd77b3 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -23,6 +23,8 @@
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
@@ -34,6 +36,8 @@
  * Fetches data from a data source.
  */
 class SourceVertexDataFetcher extends DataFetcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceVertexDataFetcher.class.getName());
+
   private final Readable readable;
   private long boundedSourceReadTime = 0;
   private static final long WATERMARK_PERIOD = 1000; // ms
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 8c924431b..518bff36f 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Lists;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.Edge;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import 
org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
@@ -124,6 +125,21 @@ public TaskExecutor(final Task task,
     this.sortedHarnesses = pair.right();
   }
 
+  // Get all of the intra-task edges + inter-task edges
+  private List<Edge> getAllIncomingEdges(
+    final Task task,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final IRVertex childVertex) {
+    final List<Edge> edges = new ArrayList<>();
+    edges.addAll(irVertexDag.getIncomingEdgesOf(childVertex));
+    final List<StageEdge> taskEdges = task.getTaskIncomingEdges().stream()
+      .filter(edge -> 
edge.getDstIRVertex().getId().equals(childVertex.getId()))
+      .collect(Collectors.toList());
+    edges.addAll(taskEdges);
+    return edges;
+  }
+
+
   /**
    * Converts the DAG of vertices into pointer-based DAG of vertex harnesses.
    * This conversion is necessary for constructing concrete data channels for 
each vertex's inputs and outputs.
@@ -158,11 +174,11 @@ public TaskExecutor(final Task task,
     // Build a map for edge as a key and edge index as a value
     // This variable is used for creating NextIntraTaskOperatorInfo
     // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+    final Map<Edge, Integer> edgeIndexMap = new HashMap<>();
     reverseTopologicallySorted.forEach(childVertex -> {
-      final List<RuntimeEdge<IRVertex>> edges = 
irVertexDag.getIncomingEdgesOf(childVertex);
+      final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, 
childVertex);
       for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
-        final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+        final Edge edge = edges.get(edgeIndex);
         edgeIndexMap.putIfAbsent(edge, edgeIndex);
       }
     });
@@ -174,13 +190,15 @@ public TaskExecutor(final Task task,
     reverseTopologicallySorted.forEach(childVertex -> {
 
       if (childVertex instanceof OperatorVertex) {
-        final List<RuntimeEdge<IRVertex>> edges = 
irVertexDag.getIncomingEdgesOf(childVertex);
+        final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, 
childVertex);
         if (edges.size() == 1) {
           operatorWatermarkManagerMap.putIfAbsent(childVertex,
-            new SingleInputWatermarkManager((OperatorVertex) childVertex));
+            new SingleInputWatermarkManager(
+              new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         } else {
           operatorWatermarkManagerMap.putIfAbsent(childVertex,
-            new MultiInputWatermarkManager(edges.size(), (OperatorVertex) 
childVertex));
+            new MultiInputWatermarkManager(edges.size(),
+              new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         }
       }
 
@@ -257,23 +275,33 @@ public TaskExecutor(final Task task,
             .orElseThrow(() -> new IllegalStateException(inEdge.toString())),
           broadcastReaders.get(i));
       }
+
       // Parent-task read (non-broadcasts)
       final List<StageEdge> nonBroadcastInEdges = new 
ArrayList<>(inEdgesForThisVertex);
       nonBroadcastInEdges.removeAll(broadcastInEdges);
-      final List<InputReader> nonBroadcastReaders =
-        getParentTaskReaders(taskIndex, nonBroadcastInEdges, 
intermediateDataIOFactory);
-      nonBroadcastReaders.forEach(parentTaskReader -> {
-        final DataFetcher dataFetcher;
-        if (parentTaskReader instanceof PipeInputReader) {
-          nonBroadcastDataFetcherList.add(
-            new 
MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
-              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
-        } else {
-          nonBroadcastDataFetcherList.add(
-            new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
-              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
-        }
-      });
+
+      nonBroadcastInEdges
+        .stream()
+        .map(incomingEdge ->
+          Pair.of(incomingEdge, intermediateDataIOFactory
+            .createReader(taskIndex, incomingEdge.getSrcIRVertex(), 
incomingEdge)))
+        .forEach(pair -> {
+          if (irVertex instanceof OperatorVertex) {
+            final StageEdge edge = pair.left();
+            final int edgeIndex = edgeIndexMap.get(edge);
+            final InputWatermarkManager watermarkManager = 
operatorWatermarkManagerMap.get(irVertex);
+            final InputReader parentTaskReader = pair.right();
+            if (parentTaskReader instanceof PipeInputReader) {
+              nonBroadcastDataFetcherList.add(
+                new 
MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
+                  new DataFetcherOutputCollector((OperatorVertex) irVertex, 
edgeIndex, watermarkManager)));
+            } else {
+              nonBroadcastDataFetcherList.add(
+                new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
+                  new DataFetcherOutputCollector((OperatorVertex) irVertex, 
edgeIndex, watermarkManager)));
+            }
+          }
+        });
     });
 
     final List<VertexHarness> sortedHarnessList = 
irVertexDag.getTopologicalSort()
@@ -529,7 +557,7 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
   private Map<String, List<NextIntraTaskOperatorInfo>> 
getInternalAdditionalOutputMap(
     final IRVertex irVertex,
     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<Edge, Integer> edgeIndexMap,
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
     // Add all intra-task additional tags to additional output map.
     final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
@@ -556,7 +584,7 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
   private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
     final IRVertex irVertex,
     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<Edge, Integer> edgeIndexMap,
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
 
     return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
index 9303da832..5242d4661 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -47,7 +47,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws 
Throwable {
 
     final OperatorVertex operatorVertex = new OperatorVertex(transform);
     final InputWatermarkManager watermarkManager =
-      new MultiInputWatermarkManager(3, operatorVertex);
+      new MultiInputWatermarkManager(3, new 
OperatorWatermarkCollector(operatorVertex));
 
     //edge1: 10 s
     //edge2: 5 s
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index 63d98d5b1..bf6279c1d 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -29,6 +29,8 @@
 import org.apache.nemo.runtime.common.plan.Task;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.*;
@@ -49,6 +51,8 @@
  */
 @NotThreadSafe
 public final class ExecutorRepresenter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
+
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
   private final Map<String, Task> runningComplyingTasks;
@@ -113,18 +117,20 @@ public void onTaskScheduled(final Task task) {
         ? runningComplyingTasks : 
runningNonComplyingTasks).put(task.getTaskId(), task);
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
-    serializationExecutorService.submit(() -> {
+
+
+    serializationExecutorService.execute(() -> {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
-          ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdManager.generateMessageId())
-              .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-              .setType(ControlMessage.MessageType.ScheduleTask)
-              .setScheduleTaskMsg(
-                  ControlMessage.ScheduleTaskMsg.newBuilder()
-                      .setTask(ByteString.copyFrom(serialized))
-                      .build())
-              .build());
+        ControlMessage.Message.newBuilder()
+          .setId(RuntimeIdManager.generateMessageId())
+          .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+          .setType(ControlMessage.MessageType.ScheduleTask)
+          .setScheduleTaskMsg(
+            ControlMessage.ScheduleTaskMsg.newBuilder()
+              .setTask(ByteString.copyFrom(serialized))
+              .build())
+          .build());
     });
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to