johnyangk closed pull request #134: [NEMO-231] Add onWatermark() method to
Transform
URL: https://github.com/apache/incubator-nemo/pull/134
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
index c6bc8ce98..a0a79f82a 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
@@ -18,6 +18,8 @@
*/
package org.apache.nemo.common.ir;
+import org.apache.nemo.common.punctuation.Watermark;
+
import java.io.Serializable;
/**
@@ -33,6 +35,11 @@
*/
void emit(O output);
+ /**
+ * Emit watermark to downstream vertices.
+ */
+ void emitWatermark(Watermark watermark);
+
/**
* Multi-destination emit.
* Currently unused, but might come in handy
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
index ce9c7341e..42b04b230 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -31,7 +31,7 @@
* @param <I> input type.
* @param <O> output type.
*/
-public final class AggregateMetricTransform<I, O> implements Transform<I, O> {
+public final class AggregateMetricTransform<I, O> extends
NoWatermarkEmitTransform<I, O> {
private static final Logger LOG =
LoggerFactory.getLogger(AggregateMetricTransform.class.getName());
private OutputCollector<O> outputCollector;
private O aggregatedDynOptData;
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
index ee53c7672..9670624db 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -32,7 +32,7 @@
* @param <I> input type.
* @param <O> output type.
*/
-public final class MetricCollectTransform<I, O> implements Transform<I, O> {
+public final class MetricCollectTransform<I, O> extends
NoWatermarkEmitTransform<I, O> {
private static final Logger LOG =
LoggerFactory.getLogger(MetricCollectTransform.class.getName());
private OutputCollector<O> outputCollector;
private O dynOptData;
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
new file mode 100644
index 000000000..798089733
--- /dev/null
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.transform;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This transform does not emit watermarks.
+ * It may be a transform for batch operation that emits collected data when
calling {@link Transform#close()}.
+ * @param <I> input type
+ * @param <O> output type
+ */
+public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I,
O> {
+
+ @Override
+ public final void onWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
+}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b74dd8b36..b0dbe543d 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
/**
* A {@link Transform} relays input data from upstream vertex to downstream
vertex promptly.
@@ -45,6 +46,11 @@ public void onData(final T element) {
outputCollector.emit(element);
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
// Do nothing.
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 67eecde56..3df39ee9c 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -19,6 +19,8 @@
package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+
import java.io.Serializable;
import java.util.Optional;
@@ -43,6 +45,14 @@
*/
void onData(I element);
+ /**
+ * On watermark received.
+ * This method should be called for the minimum watermark among input
streams (input watermark).
+ * Transform may emit collected data after receiving watermarks.
+ * @param watermark watermark
+ */
+ void onWatermark(Watermark watermark);
+
/**
* Close the transform.
*/
diff --git
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 4f24a80f7..27ee53b3f 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -19,11 +19,13 @@
package org.apache.nemo.common.punctuation;
import java.io.Serializable;
+import java.util.Objects;
/**
* Watermark event.
*/
public final class Watermark implements Serializable {
+
private final long timestamp;
public Watermark(final long timestamp) {
this.timestamp = timestamp;
@@ -32,4 +34,21 @@ public Watermark(final long timestamp) {
public long getTimestamp() {
return timestamp;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Watermark watermark = (Watermark) o;
+ return timestamp == watermark.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp);
+ }
}
diff --git
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 9c95bed3a..20bc14508 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -33,6 +33,7 @@
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.values.KV;
@@ -136,7 +137,7 @@ public Object extractKey(final Object element) {
* @param <I> input type.
* @param <O> output type.
*/
- public static class EmptyTransform<I, O> implements Transform<I, O> {
+ public static class EmptyTransform<I, O> extends NoWatermarkEmitTransform<I,
O> {
private final String name;
/**
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 564f157ca..92a1f1696 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -131,14 +131,12 @@ public final void prepare(final Context context, final
OutputCollector<WindowedV
sideInputReader = NullSideInputReader.of(sideInputs);
}
- // create step context
// this transform does not support state and timer.
final StepContext stepContext = new StepContext() {
@Override
public StateInternals stateInternals() {
throw new UnsupportedOperationException("Not support stateInternals in
DoFnTransform");
}
-
@Override
public TimerInternals timerInternals() {
throw new UnsupportedOperationException("Not support timerInternals in
DoFnTransform");
@@ -168,6 +166,10 @@ public TimerInternals timerInternals() {
doFnRunner.startBundle();
}
+ public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
+ return outputCollector;
+ }
+
@Override
public final void close() {
beforeClose();
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 3093af1d0..d60bcfc4c 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -20,7 +20,7 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.values.KV;
@@ -35,7 +35,7 @@
* @param <I> input type.
* @param <O> output type.
*/
-public final class CreateViewTransform<I, O> implements
Transform<WindowedValue<I>, WindowedValue<O>> {
+public final class CreateViewTransform<I, O> extends
NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
private final PCollectionView pCollectionView;
private OutputCollector<WindowedValue<O>> outputCollector;
private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index bc3301621..4a57ada90 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.punctuation.Watermark;
import java.util.Collection;
import java.util.List;
@@ -66,6 +67,13 @@ public void onData(final WindowedValue<InputT> data) {
getDoFnRunner().processElement(data);
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ // TODO #216: We should consider push-back data that waits for side input
+ // TODO #216: If there are push-back data, input watermark >= output
watermark
+ getOutputCollector().emitWatermark(watermark);
+ }
+
@Override
protected void beforeClose() {
// nothing
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
index 6389e33b6..082e5d7a1 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
/**
* Flatten transform implementation.
@@ -44,6 +45,11 @@ public void onData(final T element) {
outputCollector.emit(element);
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 4a3d86628..57475bad6 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -28,6 +28,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@
* @param <InputT> input type.
*/
public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
- extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>,
KV<K, Iterable<InputT>>> {
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
Iterable<InputT>>> {
private static final Logger LOG =
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
private final SystemReduceFn reduceFn;
@@ -95,6 +96,12 @@ public void onData(final WindowedValue<KV<K, InputT>>
element) {
keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ // TODO #230: Emit collected data when receiving watermark
+ // TODO #230: in GroupByKeyAndWindowTransform
+ }
+
/**
* This advances the input watermark and processing time to the timestamp
max value
* in order to emit all data.
@@ -196,4 +203,3 @@ public TimerInternals timerInternalsForKey(final K key) {
}
}
}
-
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0aed8aaa7..4e8edf06c 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -20,7 +20,7 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@
* Group Beam KVs.
* @param <I> input type.
*/
-public final class GroupByKeyTransform<I> implements Transform<I,
WindowedValue<KV<Object, List>>> {
+public final class GroupByKeyTransform<I> extends NoWatermarkEmitTransform<I,
WindowedValue<KV<Object, List>>> {
private static final Logger LOG =
LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
private final Map<Object, List> keyToValues;
private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index d88faa098..a4346182e 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -25,6 +25,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import java.util.Collection;
@@ -88,6 +89,11 @@ public BoundedWindow window() {
}
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 5f5532a14..7f5aad676 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.commons.lang3.SerializationUtils;
import java.util.ArrayList;
@@ -27,9 +27,10 @@
/**
* Collect transform.
+ * This transform is used for spark batch job, so do not emit watermark.
* @param <T> type of data to collect.
*/
-public final class CollectTransform<T> implements Transform<T, T> {
+public final class CollectTransform<T> extends NoWatermarkEmitTransform<T, T> {
private final ArrayList<T> list;
private Context ctxt;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
index 55dc9939c..aca41a0ce 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.FlatMapFunction;
/**
@@ -53,6 +54,11 @@ public void onData(final T element) {
}
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
index f54226fde..7d39aaff6 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import scala.Tuple2;
@@ -29,7 +30,7 @@
* @param <K> key type.
* @param <V> value type.
*/
-public final class GroupByKeyTransform<K, V> implements Transform<Tuple2<K,
V>, Tuple2<K, Iterable<V>>> {
+public final class GroupByKeyTransform<K, V> extends
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, Iterable<V>>> {
private final Map<K, List<V>> keyToValues;
private OutputCollector<Tuple2<K, Iterable<V>>> outputCollector;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
index be76c562a..cf5f14d80 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +35,7 @@
* Transform which saves elements to a HDFS text file for Spark.
* @param <I> input type.
*/
-public final class HDFSTextFileTransform<I> implements Transform<I, String> {
+public final class HDFSTextFileTransform<I> extends
NoWatermarkEmitTransform<I, String> {
private final String path;
private Path fileName;
private List<I> elements;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
index e5b791df7..8f6cc8b3a 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import java.io.*;
import java.util.ArrayList;
@@ -30,7 +30,7 @@
* Transform which saves elements to a local text file for Spark.
* @param <I> input type.
*/
-public final class LocalTextFileTransform<I> implements Transform<I, String> {
+public final class LocalTextFileTransform<I> extends
NoWatermarkEmitTransform<I, String> {
private final String path;
private String fileName;
private List<I> elements;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
index df14fbda0..32f3d121e 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
@@ -56,6 +57,11 @@ public void onData(final T element) {
}
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
index dda84c94b..0774b2cf8 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.Function;
/**
@@ -53,6 +54,11 @@ public void onData(final I element) {
}
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index 44f87b894..bea19d32c 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.transform;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.spark.api.java.function.Function2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@
* @param <K> key type.
* @param <V> value type.
*/
-public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K,
V>, Tuple2<K, V>> {
+public final class ReduceByKeyTransform<K, V> extends
NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
private static final Logger LOG =
LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
private final Map<K, List<V>> keyToValues;
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index 4acd21158..2141017bb 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.Function2;
import javax.annotation.Nullable;
@@ -68,6 +69,11 @@ public void onData(final T element) {
outputCollector.emit(result);
}
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
/**
* Reduce the iterator elements into a single object.
* @param elements the iterator of elements.
diff --git
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bddf7c150..8189a48b4 100644
---
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -32,6 +32,7 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.apache.reef.io.Tuple;
import org.junit.Before;
@@ -207,6 +208,11 @@ public void emit(WindowedValue<T> output) {
outputs.add(output);
}
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
@Override
public <O> void emit(String dstVertexId, O output) {
final WindowedValue<T> val = (WindowedValue<T>) output;
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 0a64042ed..3f1bc9074 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,11 @@ public void emit(final O output) {
nextOperatorVertex.getTransform().onData(output);
}
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ nextOperatorVertex.getTransform().onWatermark(watermark);
+ }
+
@Override
public <T> void emit(final String dstVertexId, final T output) {
throw new RuntimeException("No additional output tag in
DataFetcherOutputCollector");
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
index 51cfc72a7..bbe4c727a 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -20,6 +20,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -79,6 +80,11 @@ public void emit(final O output) {
taskExecutor.setIRVertexPutOnHold(irVertex);
}
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
@Override
public <T> void emit(final String dstVertexId, final T output) {
throw new IllegalStateException("Dynamic optimization does not emit tagged
data");
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 715fde8eb..598cc35ad 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -21,6 +21,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,4 +100,23 @@ public void emit(final O output) {
}
}
}
+
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ // Emit watermarks to internal vertices
+ // TODO #232: Implement InputWatermarkManager
+ // TODO #232: We should emit the minimum watermark among multiple input
streams of Transform.
+ for (final OperatorVertex internalVertex : internalMainOutputs) {
+ internalVertex.getTransform().onWatermark(watermark);
+ }
+
+ for (final List<OperatorVertex> internalVertices :
internalAdditionalOutputs.values()) {
+ for (final OperatorVertex internalVertex : internalVertices) {
+ internalVertex.getTransform().onWatermark(watermark);
+ }
+ }
+
+ // TODO #245: handle watermarks in OutputWriter
+ // TODO #245: currently ignore emitting watermarks to output writer
+ }
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 98600cdc1..96004106a 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -156,6 +156,7 @@ public TaskExecutor(final Task task,
// Create a harness for each vertex
final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
+
reverseTopologicallySorted.forEach(irVertex -> {
final Optional<Readable> sourceReader = getSourceVertexReader(irVertex,
task.getIrVertexIdToReadable());
if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
@@ -248,8 +249,7 @@ private void processElement(final OutputCollector
outputCollector, final Object
}
private void processWatermark(final OutputCollector outputCollector, final
Watermark watermark) {
- // TODO #231: Add onWatermark() method to Transform and
- // TODO #231: fowards watermark to Transforms and OutputWriters
+ outputCollector.emitWatermark(watermark);
}
/**
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index b7da5faa1..823366323 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -38,6 +38,7 @@
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.Stage;
@@ -214,7 +215,6 @@ public void clearInternalStates() {
};
final long watermark = 1234567L;
- final AtomicLong emittedWatermark = new AtomicLong(0);
final Readable readable = new Readable() {
int pointer = 0;
@@ -247,7 +247,6 @@ public void advance() throws IOException {
@Override
public long readWatermark() {
watermarkEmitted = true;
- emittedWatermark.set(watermark);
return watermark;
}
@@ -268,13 +267,19 @@ public void close() throws IOException {
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+ final List<Watermark> emittedWatermarks = new LinkedList<>();
+
+ final Transform transform = new
RelayTransformNoWatermarkEmit(emittedWatermarks);
+ final OperatorVertex operatorVertex = new OperatorVertex(transform);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex)
+ .addVertex(operatorVertex)
+ .connectVertices(createEdge(sourceIRVertex, operatorVertex, "edge1"))
.buildWithoutSourceSinkCheck();
- final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+ final StageEdge taskOutEdge = mockStageEdgeFrom(operatorVertex);
final Task task =
new Task(
"testSourceVertexDataFetching",
@@ -290,10 +295,10 @@ public void close() throws IOException {
taskExecutor.execute();
// Check whether the watermark is emitted
- assertEquals(watermark, emittedWatermark.get());
+ assertEquals(Arrays.asList(new Watermark(watermark)), emittedWatermarks);
// Check the output.
- assertTrue(checkEqualElements(elements,
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+ assertEquals(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId()));
}
/**
@@ -562,6 +567,41 @@ public Object answer(final InvocationOnMock
invocationOnMock) throws Throwable {
}
}
+ /**
+ * This transform does not emit watermark to OutputWriter
+ * because OutputWriter currently does not support watermarks (TODO #245)
+ * @param <T> type
+ */
+ private class RelayTransformNoWatermarkEmit<T> implements Transform<T, T> {
+ private OutputCollector<T> outputCollector;
+ private final List<Watermark> emittedWatermarks;
+
+ RelayTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+ this.emittedWatermarks = emittedWatermarks;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<T>
outputCollector) {
+ this.outputCollector = outputCollector;
+ }
+
+ @Override
+ public void onWatermark(Watermark watermark) {
+ emittedWatermarks.add(watermark);
+ }
+
+ @Override
+ public void onData(final Object element) {
+ outputCollector.emit((T) element);
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+ }
+
+
/**
* Simple identity function for testing.
* @param <T> input/output type.
@@ -574,6 +614,11 @@ public void prepare(final Context context, final
OutputCollector<T> outputCollec
this.outputCollector = outputCollector;
}
+ @Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void onData(final Object element) {
outputCollector.emit((T) element);
@@ -599,6 +644,11 @@ public void prepare(final Context context, final
OutputCollector<List<T>> output
this.outputCollector = outputCollector;
}
+ @Override
+ public void onWatermark(Watermark watermark) {
+ // do nothing
+ }
+
@Override
public void onData(final Object element) {
list.add((T) element);
@@ -629,6 +679,11 @@ public void prepare(final Context context, final
OutputCollector<T> outputCollec
this.outputCollector = outputCollector;
}
+ @Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void onData(final Object element) {
final Object broadcastVariable =
context.getBroadcastVariable(broadcastVariableId);
@@ -669,6 +724,11 @@ public void onData(final Integer element) {
}
}
+ @Override
+ public void onWatermark(Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
@Override
public void close() {
// Do nothing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services