This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 5b12d6c [NEMO-231] Add onWatermark() method to Transform (#134)
5b12d6c is described below
commit 5b12d6c93cad377daf4e69f6df1f3ea6b5804908
Author: Taegeon Um <[email protected]>
AuthorDate: Tue Oct 30 13:44:07 2018 +0900
[NEMO-231] Add onWatermark() method to Transform (#134)
JIRA: [NEMO-231: Add onWatermark() method to
Transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-231)
**Major changes:**
- Add `onWatermark(watermark)` method to `Transform`
- Add `emitWatermark(watermark)` method to `OutputCollector`
**Minor changes to note:**
-
**Tests for the changes:**
- Fix `testUnboundedSourceVertexDataFetching()` to check whether watermarks
are emitted or not to `OutputCollector` and other `Transforms`
**Other comments:**
-
Closes #134
---
.../org/apache/nemo/common/ir/OutputCollector.java | 7 +++
.../vertex/transform/AggregateMetricTransform.java | 2 +-
.../vertex/transform/MetricCollectTransform.java | 2 +-
.../transform/NoWatermarkEmitTransform.java} | 21 +++----
.../common/ir/vertex/transform/RelayTransform.java | 6 ++
.../nemo/common/ir/vertex/transform/Transform.java | 10 ++++
.../apache/nemo/common/punctuation/Watermark.java | 19 ++++++
.../apache/nemo/common/test/EmptyComponents.java | 3 +-
.../beam/transform/AbstractDoFnTransform.java | 6 +-
.../beam/transform/CreateViewTransform.java | 4 +-
.../frontend/beam/transform/DoFnTransform.java | 8 +++
.../frontend/beam/transform/FlattenTransform.java | 6 ++
.../GroupByKeyAndWindowDoFnTransform.java | 10 +++-
.../beam/transform/GroupByKeyTransform.java | 4 +-
.../frontend/beam/transform/WindowFnTransform.java | 6 ++
.../frontend/spark/transform/CollectTransform.java | 5 +-
.../frontend/spark/transform/FlatMapTransform.java | 6 ++
.../spark/transform/GroupByKeyTransform.java | 3 +-
.../spark/transform/HDFSTextFileTransform.java | 3 +-
.../spark/transform/LocalTextFileTransform.java | 4 +-
.../spark/transform/MapToPairTransform.java | 6 ++
.../frontend/spark/transform/MapTransform.java | 6 ++
.../spark/transform/ReduceByKeyTransform.java | 4 +-
.../frontend/spark/transform/ReduceTransform.java | 6 ++
.../frontend/beam/transform/DoFnTransformTest.java | 6 ++
.../datatransfer/DataFetcherOutputCollector.java | 6 ++
.../datatransfer/DynOptDataOutputCollector.java | 6 ++
.../OperatorVertexOutputCollector.java | 20 +++++++
.../nemo/runtime/executor/task/TaskExecutor.java | 4 +-
.../runtime/executor/task/TaskExecutorTest.java | 70 ++++++++++++++++++++--
30 files changed, 233 insertions(+), 36 deletions(-)
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
index c6bc8ce..a0a79f8 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
@@ -18,6 +18,8 @@
*/
package org.apache.nemo.common.ir;
+import org.apache.nemo.common.punctuation.Watermark;
+
import java.io.Serializable;
/**
@@ -34,6 +36,11 @@ public interface OutputCollector<O> extends Serializable {
void emit(O output);
/**
+ * Emit watermark to downstream vertices.
+ */
+ void emitWatermark(Watermark watermark);
+
+ /**
* Multi-destination emit.
* Currently unused, but might come in handy
* for operations like multi-output map.
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
index ce9c734..42b04b2 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -31,7 +31,7 @@ import java.util.function.BiFunction;
* @param <I> input type.
* @param <O> output type.
*/
-public final class AggregateMetricTransform<I, O> implements Transform<I, O> {
+public final class AggregateMetricTransform<I, O> extends
NoWatermarkEmitTransform<I, O> {
private static final Logger LOG =
LoggerFactory.getLogger(AggregateMetricTransform.class.getName());
private OutputCollector<O> outputCollector;
private O aggregatedDynOptData;
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
index ee53c76..9670624 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -32,7 +32,7 @@ import java.util.function.BiFunction;
* @param <I> input type.
* @param <O> output type.
*/
-public final class MetricCollectTransform<I, O> implements Transform<I, O> {
+public final class MetricCollectTransform<I, O> extends
NoWatermarkEmitTransform<I, O> {
private static final Logger LOG =
LoggerFactory.getLogger(MetricCollectTransform.class.getName());
private OutputCollector<O> outputCollector;
private O dynOptData;
diff --git
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
similarity index 62%
copy from common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
copy to
common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
index 4f24a80..7980897 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
@@ -16,20 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.common.punctuation;
+package org.apache.nemo.common.ir.vertex.transform;
-import java.io.Serializable;
+import org.apache.nemo.common.punctuation.Watermark;
/**
- * Watermark event.
+ * This transform does not emit watermarks.
+ * It may be a transform for batch operation that emits collected data when
calling {@link Transform#close()}.
+ * @param <I> input type
+ * @param <O> output type
*/
-public final class Watermark implements Serializable {
- private final long timestamp;
- public Watermark(final long timestamp) {
- this.timestamp = timestamp;
- }
+public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I,
O> {
- public long getTimestamp() {
- return timestamp;
+ @Override
+ public final void onWatermark(final Watermark watermark) {
+ // do nothing
}
+
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b74dd8b..b0dbe54 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
/**
* A {@link Transform} relays input data from upstream vertex to downstream
vertex promptly.
@@ -46,6 +47,11 @@ public final class RelayTransform<T> implements Transform<T,
T> {
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
// Do nothing.
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 67eecde..3df39ee 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -19,6 +19,8 @@
package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+
import java.io.Serializable;
import java.util.Optional;
@@ -44,6 +46,14 @@ public interface Transform<I, O> extends Serializable {
void onData(I element);
/**
+ * On watermark received.
+ * This method should be called for the minimum watermark among input
streams (input watermark).
+ * Transform may emit collected data after receiving watermarks.
+ * @param watermark watermark
+ */
+ void onWatermark(Watermark watermark);
+
+ /**
* Close the transform.
*/
void close();
diff --git
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 4f24a80..27ee53b 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -19,11 +19,13 @@
package org.apache.nemo.common.punctuation;
import java.io.Serializable;
+import java.util.Objects;
/**
* Watermark event.
*/
public final class Watermark implements Serializable {
+
private final long timestamp;
public Watermark(final long timestamp) {
this.timestamp = timestamp;
@@ -32,4 +34,21 @@ public final class Watermark implements Serializable {
public long getTimestamp() {
return timestamp;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Watermark watermark = (Watermark) o;
+ return timestamp == watermark.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp);
+ }
}
diff --git
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 9c95bed..20bc145 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -33,6 +33,7 @@ import
org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.values.KV;
@@ -136,7 +137,7 @@ public final class EmptyComponents {
* @param <I> input type.
* @param <O> output type.
*/
- public static class EmptyTransform<I, O> implements Transform<I, O> {
+ public static class EmptyTransform<I, O> extends NoWatermarkEmitTransform<I,
O> {
private final String name;
/**
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 564f157..92a1f16 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -131,14 +131,12 @@ public abstract class AbstractDoFnTransform<InputT,
InterT, OutputT> implements
sideInputReader = NullSideInputReader.of(sideInputs);
}
- // create step context
// this transform does not support state and timer.
final StepContext stepContext = new StepContext() {
@Override
public StateInternals stateInternals() {
throw new UnsupportedOperationException("Not support stateInternals in
DoFnTransform");
}
-
@Override
public TimerInternals timerInternals() {
throw new UnsupportedOperationException("Not support timerInternals in
DoFnTransform");
@@ -168,6 +166,10 @@ public abstract class AbstractDoFnTransform<InputT,
InterT, OutputT> implements
doFnRunner.startBundle();
}
+ public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
+ return outputCollector;
+ }
+
@Override
public final void close() {
beforeClose();
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 3093af1..d60bcfc 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.values.KV;
@@ -35,7 +35,7 @@ import java.util.ArrayList;
* @param <I> input type.
* @param <O> output type.
*/
-public final class CreateViewTransform<I, O> implements
Transform<WindowedValue<I>, WindowedValue<O>> {
+public final class CreateViewTransform<I, O> extends
NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
private final PCollectionView pCollectionView;
private OutputCollector<WindowedValue<O>> outputCollector;
private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index bc33016..4a57ada 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.punctuation.Watermark;
import java.util.Collection;
import java.util.List;
@@ -67,6 +68,13 @@ public final class DoFnTransform<InputT, OutputT> extends
AbstractDoFnTransform<
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ // TODO #216: We should consider push-back data that waits for side input
+ // TODO #216: If there are push-back data, input watermark >= output
watermark
+ getOutputCollector().emitWatermark(watermark);
+ }
+
+ @Override
protected void beforeClose() {
// nothing
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
index 6389e33..082e5d7 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
/**
* Flatten transform implementation.
@@ -45,6 +46,11 @@ public final class FlattenTransform<T> implements
Transform<T, T> {
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 4a3d866..57475ba 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ import java.util.*;
* @param <InputT> input type.
*/
public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
- extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>,
KV<K, Iterable<InputT>>> {
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
Iterable<InputT>>> {
private static final Logger LOG =
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
private final SystemReduceFn reduceFn;
@@ -95,6 +96,12 @@ public final class GroupByKeyAndWindowDoFnTransform<K,
InputT>
keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ // TODO #230: Emit collected data when receiving watermark
+ // TODO #230: in GroupByKeyAndWindowTransform
+ }
+
/**
* This advances the input watermark and processing time to the timestamp
max value
* in order to emit all data.
@@ -196,4 +203,3 @@ public final class GroupByKeyAndWindowDoFnTransform<K,
InputT>
}
}
}
-
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0aed8aa..4e8edf0 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ import java.util.*;
* Group Beam KVs.
* @param <I> input type.
*/
-public final class GroupByKeyTransform<I> implements Transform<I,
WindowedValue<KV<Object, List>>> {
+public final class GroupByKeyTransform<I> extends NoWatermarkEmitTransform<I,
WindowedValue<KV<Object, List>>> {
private static final Logger LOG =
LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
private final Map<Object, List> keyToValues;
private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index d88faa0..a434618 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import java.util.Collection;
@@ -89,6 +90,11 @@ public final class WindowFnTransform<T, W extends
BoundedWindow>
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 5f5532a..7f5aad6 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.commons.lang3.SerializationUtils;
import java.util.ArrayList;
@@ -27,9 +27,10 @@ import java.util.Base64;
/**
* Collect transform.
+ * This transform is used for spark batch job, so do not emit watermark.
* @param <T> type of data to collect.
*/
-public final class CollectTransform<T> implements Transform<T, T> {
+public final class CollectTransform<T> extends NoWatermarkEmitTransform<T, T> {
private final ArrayList<T> list;
private Context ctxt;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
index 55dc993..aca41a0 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.FlatMapFunction;
/**
@@ -54,6 +55,11 @@ public final class FlatMapTransform<T, U> implements
Transform<T, U> {
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
}
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
index f54226f..7d39aaf 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import scala.Tuple2;
@@ -29,7 +30,7 @@ import java.util.*;
* @param <K> key type.
* @param <V> value type.
*/
-public final class GroupByKeyTransform<K, V> implements Transform<Tuple2<K,
V>, Tuple2<K, Iterable<V>>> {
+public final class GroupByKeyTransform<K, V> extends
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, Iterable<V>>> {
private final Map<K, List<V>> keyToValues;
private OutputCollector<Tuple2<K, Iterable<V>>> outputCollector;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
index be76c56..cf5f14d 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +35,7 @@ import java.util.UUID;
* Transform which saves elements to a HDFS text file for Spark.
* @param <I> input type.
*/
-public final class HDFSTextFileTransform<I> implements Transform<I, String> {
+public final class HDFSTextFileTransform<I> extends
NoWatermarkEmitTransform<I, String> {
private final String path;
private Path fileName;
private List<I> elements;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
index e5b791d..8f6cc8b 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import java.io.*;
import java.util.ArrayList;
@@ -30,7 +30,7 @@ import java.util.UUID;
* Transform which saves elements to a local text file for Spark.
* @param <I> input type.
*/
-public final class LocalTextFileTransform<I> implements Transform<I, String> {
+public final class LocalTextFileTransform<I> extends
NoWatermarkEmitTransform<I, String> {
private final String path;
private String fileName;
private List<I> elements;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
index df14fbd..32f3d12 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
@@ -57,6 +58,11 @@ public final class MapToPairTransform<T, K, V> implements
Transform<T, Tuple2<K,
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
}
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
index dda84c9..0774b2c 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.Function;
/**
@@ -54,6 +55,11 @@ public final class MapTransform<I, O> implements
Transform<I, O> {
}
@Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
}
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index 44f87b8..bea19d3 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.spark.api.java.function.Function2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ import java.util.*;
* @param <K> key type.
* @param <V> value type.
*/
-public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K,
V>, Tuple2<K, V>> {
+public final class ReduceByKeyTransform<K, V> extends
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
private static final Logger LOG =
LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
private final Map<K, List<V>> keyToValues;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index 4acd211..2141017 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.Function2;
import javax.annotation.Nullable;
@@ -68,6 +69,11 @@ public final class ReduceTransform<T> implements
Transform<T, T> {
outputCollector.emit(result);
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
/**
* Reduce the iterator elements into a single object.
* @param elements the iterator of elements.
diff --git
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bddf7c1..8189a48 100644
---
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.apache.reef.io.Tuple;
import org.junit.Before;
@@ -208,6 +209,11 @@ public final class DoFnTransformTest {
}
@Override
+ public void emitWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
+ @Override
public <O> void emit(String dstVertexId, O output) {
final WindowedValue<T> val = (WindowedValue<T>) output;
final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId,
val);
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 0a64042..3f1bc90 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,11 @@ public final class DataFetcherOutputCollector<O> implements
OutputCollector<O> {
}
@Override
+ public void emitWatermark(final Watermark watermark) {
+ nextOperatorVertex.getTransform().onWatermark(watermark);
+ }
+
+ @Override
public <T> void emit(final String dstVertexId, final T output) {
throw new RuntimeException("No additional output tag in
DataFetcherOutputCollector");
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
index 51cfc72..bbe4c72 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -80,6 +81,11 @@ public final class DynOptDataOutputCollector<O> implements
OutputCollector<O> {
}
@Override
+ public void emitWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
+ @Override
public <T> void emit(final String dstVertexId, final T output) {
throw new IllegalStateException("Dynamic optimization does not emit tagged
data");
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 715fde8..598cc35 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,4 +100,23 @@ public final class OperatorVertexOutputCollector<O>
implements OutputCollector<O
}
}
}
+
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ // Emit watermarks to internal vertices
+ // TODO #232: Implement InputWatermarkManager
+ // TODO #232: We should emit the minimum watermark among multiple input
streams of Transform.
+ for (final OperatorVertex internalVertex : internalMainOutputs) {
+ internalVertex.getTransform().onWatermark(watermark);
+ }
+
+ for (final List<OperatorVertex> internalVertices :
internalAdditionalOutputs.values()) {
+ for (final OperatorVertex internalVertex : internalVertices) {
+ internalVertex.getTransform().onWatermark(watermark);
+ }
+ }
+
+ // TODO #245: handle watermarks in OutputWriter
+ // TODO #245: currently ignore emitting watermarks to output writer
+ }
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 98600cd..9600410 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -156,6 +156,7 @@ public final class TaskExecutor {
// Create a harness for each vertex
final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
+
reverseTopologicallySorted.forEach(irVertex -> {
final Optional<Readable> sourceReader = getSourceVertexReader(irVertex,
task.getIrVertexIdToReadable());
if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
@@ -248,8 +249,7 @@ public final class TaskExecutor {
}
private void processWatermark(final OutputCollector outputCollector, final
Watermark watermark) {
- // TODO #231: Add onWatermark() method to Transform and
- // TODO #231: fowards watermark to Transforms and OutputWriters
+ outputCollector.emitWatermark(watermark);
}
/**
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index b7da5fa..8233663 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -38,6 +38,7 @@ import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.Stage;
@@ -214,7 +215,6 @@ public final class TaskExecutorTest {
};
final long watermark = 1234567L;
- final AtomicLong emittedWatermark = new AtomicLong(0);
final Readable readable = new Readable() {
int pointer = 0;
@@ -247,7 +247,6 @@ public final class TaskExecutorTest {
@Override
public long readWatermark() {
watermarkEmitted = true;
- emittedWatermark.set(watermark);
return watermark;
}
@@ -268,13 +267,19 @@ public final class TaskExecutorTest {
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+ final List<Watermark> emittedWatermarks = new LinkedList<>();
+
+ final Transform transform = new
RelayTransformNoWatermarkEmit(emittedWatermarks);
+ final OperatorVertex operatorVertex = new OperatorVertex(transform);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex)
+ .addVertex(operatorVertex)
+ .connectVertices(createEdge(sourceIRVertex, operatorVertex, "edge1"))
.buildWithoutSourceSinkCheck();
- final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+ final StageEdge taskOutEdge = mockStageEdgeFrom(operatorVertex);
final Task task =
new Task(
"testSourceVertexDataFetching",
@@ -290,10 +295,10 @@ public final class TaskExecutorTest {
taskExecutor.execute();
// Check whether the watermark is emitted
- assertEquals(watermark, emittedWatermark.get());
+ assertEquals(Arrays.asList(new Watermark(watermark)), emittedWatermarks);
// Check the output.
- assertTrue(checkEqualElements(elements,
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+ assertEquals(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId()));
}
/**
@@ -563,6 +568,41 @@ public final class TaskExecutorTest {
}
/**
+ * This transform does not emit watermark to OutputWriter
+ * because OutputWriter currently does not support watermarks (TODO #245)
+ * @param <T> type
+ */
+ private class RelayTransformNoWatermarkEmit<T> implements Transform<T, T> {
+ private OutputCollector<T> outputCollector;
+ private final List<Watermark> emittedWatermarks;
+
+ RelayTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+ this.emittedWatermarks = emittedWatermarks;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<T>
outputCollector) {
+ this.outputCollector = outputCollector;
+ }
+
+ @Override
+ public void onWatermark(Watermark watermark) {
+ emittedWatermarks.add(watermark);
+ }
+
+ @Override
+ public void onData(final Object element) {
+ outputCollector.emit((T) element);
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+ }
+
+
+ /**
* Simple identity function for testing.
* @param <T> input/output type.
*/
@@ -575,6 +615,11 @@ public final class TaskExecutorTest {
}
@Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void onData(final Object element) {
outputCollector.emit((T) element);
}
@@ -600,6 +645,11 @@ public final class TaskExecutorTest {
}
@Override
+ public void onWatermark(Watermark watermark) {
+ // do nothing
+ }
+
+ @Override
public void onData(final Object element) {
list.add((T) element);
}
@@ -630,6 +680,11 @@ public final class TaskExecutorTest {
}
@Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void onData(final Object element) {
final Object broadcastVariable =
context.getBroadcastVariable(broadcastVariableId);
outputCollector.emit((T) Pair.of(broadcastVariable, element));
@@ -670,6 +725,11 @@ public final class TaskExecutorTest {
}
@Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
public void close() {
// Do nothing.
}