This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b12d6c  [NEMO-231] Add onWatermark() method to Transform (#134)
5b12d6c is described below

commit 5b12d6c93cad377daf4e69f6df1f3ea6b5804908
Author: Taegeon Um <[email protected]>
AuthorDate: Tue Oct 30 13:44:07 2018 +0900

    [NEMO-231] Add onWatermark() method to Transform (#134)
    
    JIRA: [NEMO-231: Add onWatermark() method to 
Transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-231)
    
    **Major changes:**
    - Add `onWatermark(watermark)` method to `Transform`
    - Add `emitWatermark(watermark)` method to `OutputCollector`
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    - Fix `testUnboundedSourceVertexDataFetching()` to check whether watermarks 
are emitted or not to `OutputCollector` and other `Transforms`
    
    **Other comments:**
    -
    
    Closes #134
---
 .../org/apache/nemo/common/ir/OutputCollector.java |  7 +++
 .../vertex/transform/AggregateMetricTransform.java |  2 +-
 .../vertex/transform/MetricCollectTransform.java   |  2 +-
 .../transform/NoWatermarkEmitTransform.java}       | 21 +++----
 .../common/ir/vertex/transform/RelayTransform.java |  6 ++
 .../nemo/common/ir/vertex/transform/Transform.java | 10 ++++
 .../apache/nemo/common/punctuation/Watermark.java  | 19 ++++++
 .../apache/nemo/common/test/EmptyComponents.java   |  3 +-
 .../beam/transform/AbstractDoFnTransform.java      |  6 +-
 .../beam/transform/CreateViewTransform.java        |  4 +-
 .../frontend/beam/transform/DoFnTransform.java     |  8 +++
 .../frontend/beam/transform/FlattenTransform.java  |  6 ++
 .../GroupByKeyAndWindowDoFnTransform.java          | 10 +++-
 .../beam/transform/GroupByKeyTransform.java        |  4 +-
 .../frontend/beam/transform/WindowFnTransform.java |  6 ++
 .../frontend/spark/transform/CollectTransform.java |  5 +-
 .../frontend/spark/transform/FlatMapTransform.java |  6 ++
 .../spark/transform/GroupByKeyTransform.java       |  3 +-
 .../spark/transform/HDFSTextFileTransform.java     |  3 +-
 .../spark/transform/LocalTextFileTransform.java    |  4 +-
 .../spark/transform/MapToPairTransform.java        |  6 ++
 .../frontend/spark/transform/MapTransform.java     |  6 ++
 .../spark/transform/ReduceByKeyTransform.java      |  4 +-
 .../frontend/spark/transform/ReduceTransform.java  |  6 ++
 .../frontend/beam/transform/DoFnTransformTest.java |  6 ++
 .../datatransfer/DataFetcherOutputCollector.java   |  6 ++
 .../datatransfer/DynOptDataOutputCollector.java    |  6 ++
 .../OperatorVertexOutputCollector.java             | 20 +++++++
 .../nemo/runtime/executor/task/TaskExecutor.java   |  4 +-
 .../runtime/executor/task/TaskExecutorTest.java    | 70 ++++++++++++++++++++--
 30 files changed, 233 insertions(+), 36 deletions(-)

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java 
b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
index c6bc8ce..a0a79f8 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.common.ir;
 
+import org.apache.nemo.common.punctuation.Watermark;
+
 import java.io.Serializable;
 
 /**
@@ -34,6 +36,11 @@ public interface OutputCollector<O> extends Serializable {
   void emit(O output);
 
   /**
+   * Emit watermark to downstream vertices.
+   */
+  void emitWatermark(Watermark watermark);
+
+  /**
    * Multi-destination emit.
    * Currently unused, but might come in handy
    * for operations like multi-output map.
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
index ce9c734..42b04b2 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -31,7 +31,7 @@ import java.util.function.BiFunction;
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class AggregateMetricTransform<I, O> implements Transform<I, O> {
+public final class AggregateMetricTransform<I, O> extends 
NoWatermarkEmitTransform<I, O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(AggregateMetricTransform.class.getName());
   private OutputCollector<O> outputCollector;
   private O aggregatedDynOptData;
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
index ee53c76..9670624 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -32,7 +32,7 @@ import java.util.function.BiFunction;
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class MetricCollectTransform<I, O> implements Transform<I, O> {
+public final class MetricCollectTransform<I, O> extends 
NoWatermarkEmitTransform<I, O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricCollectTransform.class.getName());
   private OutputCollector<O> outputCollector;
   private O dynOptData;
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
similarity index 62%
copy from common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
copy to 
common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
index 4f24a80..7980897 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
@@ -16,20 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.common.punctuation;
+package org.apache.nemo.common.ir.vertex.transform;
 
-import java.io.Serializable;
+import org.apache.nemo.common.punctuation.Watermark;
 
 /**
- * Watermark event.
+ * This transform does not emit watermarks.
+ * It may be a transform for batch operation that emits collected data when 
calling {@link Transform#close()}.
+ * @param <I> input type
+ * @param <O> output type
  */
-public final class Watermark implements Serializable {
-  private final long timestamp;
-  public Watermark(final long timestamp) {
-    this.timestamp = timestamp;
-  }
+public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I, 
O> {
 
-  public long getTimestamp() {
-    return timestamp;
+  @Override
+  public final void onWatermark(final Watermark watermark) {
+    // do nothing
   }
+
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b74dd8b..b0dbe54 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
 
 /**
  * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
@@ -46,6 +47,11 @@ public final class RelayTransform<T> implements Transform<T, 
T> {
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
     // Do nothing.
   }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 67eecde..3df39ee 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -19,6 +19,8 @@
 package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+
 import java.io.Serializable;
 import java.util.Optional;
 
@@ -44,6 +46,14 @@ public interface Transform<I, O> extends Serializable {
   void onData(I element);
 
   /**
+   * On watermark received.
+   * This method should be called for the minimum watermark among input 
streams (input watermark).
+   * Transform may emit collected data after receiving watermarks.
+   * @param watermark watermark
+   */
+  void onWatermark(Watermark watermark);
+
+  /**
    * Close the transform.
    */
   void close();
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 4f24a80..27ee53b 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -19,11 +19,13 @@
 package org.apache.nemo.common.punctuation;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Watermark event.
  */
 public final class Watermark implements Serializable {
+
   private final long timestamp;
   public Watermark(final long timestamp) {
     this.timestamp = timestamp;
@@ -32,4 +34,21 @@ public final class Watermark implements Serializable {
   public long getTimestamp() {
     return timestamp;
   }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final Watermark watermark = (Watermark) o;
+    return timestamp == watermark.timestamp;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(timestamp);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java 
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 9c95bed..20bc145 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -33,6 +33,7 @@ import 
org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.values.KV;
 
@@ -136,7 +137,7 @@ public final class EmptyComponents {
    * @param <I> input type.
    * @param <O> output type.
    */
-  public static class EmptyTransform<I, O> implements Transform<I, O> {
+  public static class EmptyTransform<I, O> extends NoWatermarkEmitTransform<I, 
O> {
     private final String name;
 
     /**
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 564f157..92a1f16 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -131,14 +131,12 @@ public abstract class AbstractDoFnTransform<InputT, 
InterT, OutputT> implements
       sideInputReader = NullSideInputReader.of(sideInputs);
     }
 
-    // create step context
     // this transform does not support state and timer.
     final StepContext stepContext = new StepContext() {
       @Override
       public StateInternals stateInternals() {
         throw new UnsupportedOperationException("Not support stateInternals in 
DoFnTransform");
       }
-
       @Override
       public TimerInternals timerInternals() {
         throw new UnsupportedOperationException("Not support timerInternals in 
DoFnTransform");
@@ -168,6 +166,10 @@ public abstract class AbstractDoFnTransform<InputT, 
InterT, OutputT> implements
     doFnRunner.startBundle();
   }
 
+  public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
+    return outputCollector;
+  }
+
   @Override
   public final void close() {
     beforeClose();
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 3093af1..d60bcfc 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
@@ -35,7 +35,7 @@ import java.util.ArrayList;
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class CreateViewTransform<I, O> implements 
Transform<WindowedValue<I>, WindowedValue<O>> {
+public final class CreateViewTransform<I, O> extends 
NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
   private final PCollectionView pCollectionView;
   private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index bc33016..4a57ada 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.Collection;
 import java.util.List;
@@ -67,6 +68,13 @@ public final class DoFnTransform<InputT, OutputT> extends 
AbstractDoFnTransform<
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    // TODO #216: We should consider push-back data that waits for side input
+    // TODO #216: If there are push-back data, input watermark >= output 
watermark
+    getOutputCollector().emitWatermark(watermark);
+  }
+
+  @Override
   protected void beforeClose() {
     // nothing
   }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
index 6389e33..082e5d7 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 /**
  * Flatten transform implementation.
@@ -45,6 +46,11 @@ public final class FlattenTransform<T> implements 
Transform<T, T> {
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
   }
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 4a3d866..57475ba 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,7 @@ import java.util.*;
  * @param <InputT> input type.
  */
 public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
-    extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, 
KV<K, Iterable<InputT>>> {
+  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, 
Iterable<InputT>>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
 
   private final SystemReduceFn reduceFn;
@@ -95,6 +96,12 @@ public final class GroupByKeyAndWindowDoFnTransform<K, 
InputT>
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
   }
 
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    // TODO #230: Emit collected data when receiving watermark
+    // TODO #230: in GroupByKeyAndWindowTransform
+  }
+
   /**
    * This advances the input watermark and processing time to the timestamp 
max value
    * in order to emit all data.
@@ -196,4 +203,3 @@ public final class GroupByKeyAndWindowDoFnTransform<K, 
InputT>
     }
   }
 }
-
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0aed8aa..4e8edf0 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ import java.util.*;
  * Group Beam KVs.
  * @param <I> input type.
  */
-public final class GroupByKeyTransform<I> implements Transform<I, 
WindowedValue<KV<Object, List>>> {
+public final class GroupByKeyTransform<I> extends NoWatermarkEmitTransform<I, 
WindowedValue<KV<Object, List>>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
   private final Map<Object, List> keyToValues;
   private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index d88faa0..a434618 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -89,6 +90,11 @@ public final class WindowFnTransform<T, W extends 
BoundedWindow>
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
   }
 
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 5f5532a..7f5aad6 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.commons.lang3.SerializationUtils;
 
 import java.util.ArrayList;
@@ -27,9 +27,10 @@ import java.util.Base64;
 
 /**
  * Collect transform.
+ * This transform is used for spark batch job, so do not emit watermark.
  * @param <T> type of data to collect.
  */
-public final class CollectTransform<T> implements Transform<T, T> {
+public final class CollectTransform<T> extends NoWatermarkEmitTransform<T, T> {
   private final ArrayList<T> list;
   private Context ctxt;
 
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
index 55dc993..aca41a0 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.FlatMapFunction;
 
 /**
@@ -54,6 +55,11 @@ public final class FlatMapTransform<T, U> implements 
Transform<T, U> {
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
   }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
index f54226f..7d39aaf 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import scala.Tuple2;
 
@@ -29,7 +30,7 @@ import java.util.*;
  * @param <K> key type.
  * @param <V> value type.
  */
-public final class GroupByKeyTransform<K, V> implements Transform<Tuple2<K, 
V>, Tuple2<K, Iterable<V>>> {
+public final class GroupByKeyTransform<K, V> extends 
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, Iterable<V>>> {
   private final Map<K, List<V>> keyToValues;
   private OutputCollector<Tuple2<K, Iterable<V>>> outputCollector;
 
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
index be76c56..cf5f14d 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +35,7 @@ import java.util.UUID;
  * Transform which saves elements to a HDFS text file for Spark.
  * @param <I> input type.
  */
-public final class HDFSTextFileTransform<I> implements Transform<I, String> {
+public final class HDFSTextFileTransform<I> extends 
NoWatermarkEmitTransform<I, String> {
   private final String path;
   private Path fileName;
   private List<I> elements;
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
index e5b791d..8f6cc8b 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 
 import java.io.*;
 import java.util.ArrayList;
@@ -30,7 +30,7 @@ import java.util.UUID;
  * Transform which saves elements to a local text file for Spark.
  * @param <I> input type.
  */
-public final class LocalTextFileTransform<I> implements Transform<I, String> {
+public final class LocalTextFileTransform<I> extends 
NoWatermarkEmitTransform<I, String> {
   private final String path;
   private String fileName;
   private List<I> elements;
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
index df14fbd..32f3d12 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
@@ -57,6 +58,11 @@ public final class MapToPairTransform<T, K, V> implements 
Transform<T, Tuple2<K,
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
   }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
index dda84c9..0774b2c 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.Function;
 
 /**
@@ -54,6 +55,11 @@ public final class MapTransform<I, O> implements 
Transform<I, O> {
   }
 
   @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
   public void close() {
   }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index 44f87b8..bea19d3 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.spark.api.java.function.Function2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ import java.util.*;
  * @param <K> key type.
  * @param <V> value type.
  */
-public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K, 
V>, Tuple2<K, V>> {
+public final class ReduceByKeyTransform<K, V> extends 
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
 
   private final Map<K, List<V>> keyToValues;
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index 4acd211..2141017 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.Function2;
 
 import javax.annotation.Nullable;
@@ -68,6 +69,11 @@ public final class ReduceTransform<T> implements 
Transform<T, T> {
     outputCollector.emit(result);
   }
 
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
   /**
    * Reduce the iterator elements into a single object.
    * @param elements the iterator of elements.
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bddf7c1..8189a48 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.apache.reef.io.Tuple;
 import org.junit.Before;
@@ -208,6 +209,11 @@ public final class DoFnTransformTest {
     }
 
     @Override
+    public void emitWatermark(final Watermark watermark) {
+      // do nothing
+    }
+
+    @Override
     public <O> void emit(String dstVertexId, O output) {
       final WindowedValue<T> val = (WindowedValue<T>) output;
       final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, 
val);
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 0a64042..3f1bc90 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +45,11 @@ public final class DataFetcherOutputCollector<O> implements 
OutputCollector<O> {
   }
 
   @Override
+  public void emitWatermark(final Watermark watermark) {
+    nextOperatorVertex.getTransform().onWatermark(watermark);
+  }
+
+  @Override
   public <T> void emit(final String dstVertexId, final T output) {
     throw new RuntimeException("No additional output tag in 
DataFetcherOutputCollector");
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
index 51cfc72..bbe4c72 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -80,6 +81,11 @@ public final class DynOptDataOutputCollector<O> implements 
OutputCollector<O> {
   }
 
   @Override
+  public void emitWatermark(final Watermark watermark) {
+    // do nothing
+  }
+
+  @Override
   public <T> void emit(final String dstVertexId, final T output) {
     throw new IllegalStateException("Dynamic optimization does not emit tagged 
data");
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 715fde8..598cc35 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,4 +100,23 @@ public final class OperatorVertexOutputCollector<O> 
implements OutputCollector<O
       }
     }
   }
+
+  @Override
+  public void emitWatermark(final Watermark watermark) {
+    // Emit watermarks to internal vertices
+    // TODO #232: Implement InputWatermarkManager
+    // TODO #232: We should emit the minimum watermark among multiple input 
streams of Transform.
+    for (final OperatorVertex internalVertex : internalMainOutputs) {
+      internalVertex.getTransform().onWatermark(watermark);
+    }
+
+    for (final List<OperatorVertex> internalVertices : 
internalAdditionalOutputs.values()) {
+      for (final OperatorVertex internalVertex : internalVertices) {
+        internalVertex.getTransform().onWatermark(watermark);
+      }
+    }
+
+    // TODO #245: handle watermarks in OutputWriter
+    // TODO #245: currently ignore emitting watermarks to output writer
+  }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 98600cd..9600410 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -156,6 +156,7 @@ public final class TaskExecutor {
     // Create a harness for each vertex
     final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
+
     reverseTopologicallySorted.forEach(irVertex -> {
       final Optional<Readable> sourceReader = getSourceVertexReader(irVertex, 
task.getIrVertexIdToReadable());
       if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
@@ -248,8 +249,7 @@ public final class TaskExecutor {
   }
 
   private void processWatermark(final OutputCollector outputCollector, final 
Watermark watermark) {
-    // TODO #231: Add onWatermark() method to Transform and
-    // TODO #231: fowards watermark to Transforms and OutputWriters
+    outputCollector.emitWatermark(watermark);
   }
 
   /**
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index b7da5fa..8233663 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -38,6 +38,7 @@ import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import org.apache.nemo.runtime.common.plan.Stage;
@@ -214,7 +215,6 @@ public final class TaskExecutorTest {
     };
 
     final long watermark = 1234567L;
-    final AtomicLong emittedWatermark = new AtomicLong(0);
 
     final Readable readable = new Readable() {
       int pointer = 0;
@@ -247,7 +247,6 @@ public final class TaskExecutorTest {
       @Override
       public long readWatermark() {
         watermarkEmitted = true;
-        emittedWatermark.set(watermark);
         return watermark;
       }
 
@@ -268,13 +267,19 @@ public final class TaskExecutorTest {
 
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+    final List<Watermark> emittedWatermarks = new LinkedList<>();
+
+    final Transform transform = new 
RelayTransformNoWatermarkEmit(emittedWatermarks);
+    final OperatorVertex operatorVertex = new OperatorVertex(transform);
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
       new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
         .addVertex(sourceIRVertex)
+        .addVertex(operatorVertex)
+        .connectVertices(createEdge(sourceIRVertex, operatorVertex, "edge1"))
         .buildWithoutSourceSinkCheck();
 
-    final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+    final StageEdge taskOutEdge = mockStageEdgeFrom(operatorVertex);
     final Task task =
       new Task(
         "testSourceVertexDataFetching",
@@ -290,10 +295,10 @@ public final class TaskExecutorTest {
     taskExecutor.execute();
 
     // Check whether the watermark is emitted
-    assertEquals(watermark, emittedWatermark.get());
+    assertEquals(Arrays.asList(new Watermark(watermark)), emittedWatermarks);
 
     // Check the output.
-    assertTrue(checkEqualElements(elements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+    assertEquals(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId()));
   }
 
   /**
@@ -563,6 +568,41 @@ public final class TaskExecutorTest {
   }
 
   /**
+   * This transform does not emit watermark to OutputWriter
+   * because OutputWriter currently does not support watermarks (TODO #245)
+   * @param <T> type
+   */
+  private class RelayTransformNoWatermarkEmit<T> implements Transform<T, T> {
+    private OutputCollector<T> outputCollector;
+    private final List<Watermark> emittedWatermarks;
+
+    RelayTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+      this.emittedWatermarks = emittedWatermarks;
+    }
+
+    @Override
+    public void prepare(final Context context, final OutputCollector<T> 
outputCollector) {
+      this.outputCollector = outputCollector;
+    }
+
+    @Override
+    public void onWatermark(Watermark watermark) {
+      emittedWatermarks.add(watermark);
+    }
+
+    @Override
+    public void onData(final Object element) {
+      outputCollector.emit((T) element);
+    }
+
+    @Override
+    public void close() {
+      // Do nothing.
+    }
+  }
+
+
+  /**
    * Simple identity function for testing.
    * @param <T> input/output type.
    */
@@ -575,6 +615,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onWatermark(Watermark watermark) {
+      outputCollector.emitWatermark(watermark);
+    }
+
+    @Override
     public void onData(final Object element) {
       outputCollector.emit((T) element);
     }
@@ -600,6 +645,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onWatermark(Watermark watermark) {
+      // do nothing
+    }
+
+    @Override
     public void onData(final Object element) {
       list.add((T) element);
     }
@@ -630,6 +680,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onWatermark(Watermark watermark) {
+      outputCollector.emitWatermark(watermark);
+    }
+
+    @Override
     public void onData(final Object element) {
       final Object broadcastVariable = 
context.getBroadcastVariable(broadcastVariableId);
       outputCollector.emit((T) Pair.of(broadcastVariable, element));
@@ -670,6 +725,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onWatermark(Watermark watermark) {
+      outputCollector.emitWatermark(watermark);
+    }
+
+    @Override
     public void close() {
       // Do nothing.
     }

Reply via email to