This is an automated email from the ASF dual-hosted git repository. wonook pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push: new d36733260 [MINOR] Expose Latency metrics (#324) d36733260 is described below commit d36733260fc2f245e0516e89c3c25c573db0f058 Author: Won Wook SONG <wsong0...@gmail.com> AuthorDate: Mon Aug 1 15:24:16 2022 +0900 [MINOR] Expose Latency metrics (#324) **Major changes:** - Fix for exposing latency metrics on the metrics log **Minor changes to note:** - **Tests for the changes:** - Existing tests pass. **Other comments:** - Closes #324 --- .../org/apache/nemo/common/ir/OutputCollector.java | 4 +-- .../vertex/transform/LatencymarkEmitTransform.java | 6 ++-- .../vertex/transform/NoWatermarkEmitTransform.java | 4 +-- .../nemo/common/ir/vertex/transform/Transform.java | 4 +-- .../{Latencymark.java => LatencyMark.java} | 34 ++++++++++++++-------- .../beam/transform/AbstractDoFnTransform.java | 4 +-- .../frontend/beam/transform/GBKTransform.java | 4 +-- .../beam/transform/TestOutputCollector.java | 10 +++---- .../nemo/runtime/common/metric/LatencyMetric.java | 17 ++++++++--- .../nemo/runtime/common/metric/TaskMetric.java | 8 +++-- .../executor/datatransfer/BlockOutputWriter.java | 4 +-- .../datatransfer/DataFetcherOutputCollector.java | 4 +-- .../datatransfer/NemoEventDecoderFactory.java | 4 +-- .../datatransfer/NemoEventEncoderFactory.java | 4 +-- .../OperatorVertexOutputCollector.java | 4 +-- .../datatransfer/OperatorWatermarkCollector.java | 4 +-- .../executor/datatransfer/OutputWriter.java | 4 +-- .../executor/datatransfer/PipeOutputWriter.java | 4 +-- .../RunTimeMessageOutputCollector.java | 4 +-- .../task/MultiThreadParentTaskDataFetcher.java | 4 +-- .../executor/task/SourceVertexDataFetcher.java | 4 +-- .../nemo/runtime/executor/task/TaskExecutor.java | 12 ++++---- .../runtime/executor/task/TaskExecutorTest.java | 28 +++++++++--------- 23 files changed, 102 insertions(+), 77 deletions(-) diff --git a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java index 916a06f3f..e90de4bee 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java +++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.common.ir; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import java.io.Serializable; @@ -50,7 +50,7 @@ public interface OutputCollector<O> extends Serializable { * * @param latencymark latencymark */ - void emitLatencymark(Latencymark latencymark); + void emitLatencymark(LatencyMark latencymark); /** * Multi-destination emit. diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java index 44640a144..eaa60b2ba 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java @@ -19,10 +19,10 @@ package org.apache.nemo.common.ir.vertex.transform; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; /** - * This transform emits {@link Latencymark}. + * This transform emits {@link LatencyMark}. * * @param <I> input type * @param <O> output type @@ -50,7 +50,7 @@ public abstract class LatencymarkEmitTransform<I, O> implements Transform<I, O> * @param latencymark latencymark */ @Override - public final void onLatencymark(final Latencymark latencymark) { + public final void onLatencymark(final LatencyMark latencymark) { outputCollector.emitLatencymark(latencymark); } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java index 7f8b9d1e1..029ba8337 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.common.ir.vertex.transform; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; /** @@ -39,7 +39,7 @@ public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I, O> } @Override - public final void onLatencymark(final Latencymark latencymark) { + public final void onLatencymark(final LatencyMark latencymark) { // do nothing } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java index d16ab5c24..dd00d82af 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java @@ -19,7 +19,7 @@ package org.apache.nemo.common.ir.vertex.transform; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import java.io.Serializable; @@ -63,7 +63,7 @@ public interface Transform<I, O> extends Serializable { * * @param latencymark latencymark. */ - void onLatencymark(Latencymark latencymark); + void onLatencymark(LatencyMark latencymark); /** * Close the transform. diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java b/common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java similarity index 63% rename from common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java rename to common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java index a331e1932..56f26c348 100644 --- a/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java +++ b/common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java @@ -22,10 +22,13 @@ import java.io.Serializable; import java.util.Objects; /** - * Latency mark is conveyor that has data for debugging. - * It is created only from source vertex and record the timestamp when it is created and taskId where it is created. + * Latency mark is a watermark with the data related to stream data latencies. + * It is created only from source vertex, with the data of when (timestamp) and where (taskId) it was created. + * Later tasks can infer the latency according to the time that the latencyMark arrives to the task. + * When the latencyMark arrives in a task, it leaves its record with its task id and timestamp, and + * later tasks can track the itinerary by looking at the recorded previous task id and timestamp. */ -public final class Latencymark implements Serializable { +public final class LatencyMark implements Serializable { private final String createdTaskId; private final long createdTimestamp; private String previousTaskId; @@ -33,10 +36,10 @@ public final class Latencymark implements Serializable { /** - * @param taskId task id where it is created - * @param timestamp timestamp when it is created + * @param taskId task id of where it was created + * @param timestamp timestamp of when it was created */ - public Latencymark(final String taskId, final long timestamp) { + public LatencyMark(final String taskId, final long timestamp) { this.createdTaskId = taskId; this.createdTimestamp = timestamp; this.previousTaskId = ""; @@ -44,30 +47,37 @@ public final class Latencymark implements Serializable { } /** - * @return the latencymark timestamp + * @return the timestamp of when this latencyMark was first created. */ public long getCreatedTimestamp() { return createdTimestamp; } /** - * @return the task id where it is created + * @return the task id of where it was created. */ public String getCreatedTaskId() { return createdTaskId; } /** - * @return the task id of previous task + * @return the task id of task that this latency mark was previously passed on by. */ public String getPreviousTaskId() { return previousTaskId; } + /** + * @return the time stamp when this latency mark was previously passed on by a task. + */ + public long getPreviousSentTimestamp() { + return previousSentTimestamp; + } + /** * Set the previousTaskId. * - * @param taskId the task id. + * @param taskId the task id of where this latencyMark has gone through previously. */ public void setPreviousTaskId(final String taskId) { previousTaskId = taskId; @@ -76,7 +86,7 @@ public final class Latencymark implements Serializable { /** * Set the previousSentTimestamp. * - * @param timestamp the timestamp. + * @param timestamp the timestamp of when this latencyMark was sent from a previous task. */ public void setPreviousSentTimestamp(final long timestamp) { previousSentTimestamp = timestamp; @@ -90,7 +100,7 @@ public final class Latencymark implements Serializable { if (o == null || getClass() != o.getClass()) { return false; } - final Latencymark latencymark = (Latencymark) o; + final LatencyMark latencymark = (LatencyMark) o; return (createdTimestamp == latencymark.createdTimestamp) && (createdTaskId.equals(latencymark.createdTaskId) && (previousTaskId.equals(latencymark.previousTaskId))); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java index 1cd68999b..84d9b450f 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.transform.Transform; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; import org.slf4j.Logger; @@ -319,7 +319,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements * @param latencymark latencymark. */ @Override - public void onLatencymark(final Latencymark latencymark) { + public void onLatencymark(final LatencyMark latencymark) { getOutputCollector().emitLatencymark(latencymark); } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java index 9194edbbc..d07b504a9 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.joda.time.Instant; import org.slf4j.Logger; @@ -302,7 +302,7 @@ public final class GBKTransform<K, InputT, OutputT> /** Emit latencymark. */ @Override - public final void emitLatencymark(final Latencymark latencymark) { + public final void emitLatencymark(final LatencyMark latencymark) { oc.emitLatencymark(latencymark); } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java index 914ccf50c..8f49ec678 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java @@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.reef.io.Tuple; @@ -36,13 +36,13 @@ final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> public final List<WindowedValue<T>> outputs; public final List<Tuple<String, WindowedValue<T>>> taggedOutputs; public final List<Watermark> watermarks; - public final List<Latencymark> latencymarks; + public final List<LatencyMark> latencyMarks; TestOutputCollector() { this.outputs = new LinkedList<>(); this.taggedOutputs = new LinkedList<>(); this.watermarks = new LinkedList<>(); - this.latencymarks = new LinkedList<>(); + this.latencyMarks = new LinkedList<>(); } @Override @@ -56,8 +56,8 @@ final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> } @Override - public void emitLatencymark(Latencymark latencymark) { - latencymarks.add(latencymark); + public void emitLatencymark(LatencyMark latencymark) { + latencyMarks.add(latencymark); } @Override diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java index a5fb93e6f..ea4dd1ea7 100644 --- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.common.metric; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import java.io.Serializable; @@ -27,8 +27,9 @@ import java.io.Serializable; * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded. */ public final class LatencyMetric implements Serializable { - private final Latencymark latencymark; + private final LatencyMark latencymark; private final long timestamp; + private final long latency; /** * Constructor with the latencymark and timestamp. @@ -36,9 +37,10 @@ public final class LatencyMetric implements Serializable { * @param latencymark the latencymark to record. * @param timestamp When the latencymark was received. */ - public LatencyMetric(final Latencymark latencymark, final long timestamp) { + public LatencyMetric(final LatencyMark latencymark, final long timestamp) { this.latencymark = latencymark; this.timestamp = timestamp; + this.latency = timestamp - latencymark.getCreatedTimestamp(); } /** @@ -46,7 +48,7 @@ public final class LatencyMetric implements Serializable { * * @return latency mark. */ - public Latencymark getLatencymark() { + public LatencyMark getLatencymark() { return latencymark; } @@ -58,4 +60,11 @@ public final class LatencyMetric implements Serializable { public long getTimestamp() { return this.timestamp; } + + /** + * @return the latency. + */ + public long getLatency() { + return latency; + } } diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java index 552153036..f1b642526 100644 --- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java @@ -133,7 +133,11 @@ public class TaskMetric implements StateMetric<TaskState.State> { } } - private void addLatencymark(final LatencyMetric latencyMetric) { + public final Map<String, List<LatencyMetric>> getLatencyMetric() { + return latencymarks; + } + + private void setLatencyMetric(final LatencyMetric latencyMetric) { latencymarks.putIfAbsent(latencyMetric.getLatencymark().getPreviousTaskId(), new LinkedList<>()); latencymarks.get(latencyMetric.getLatencymark().getPreviousTaskId()).add(latencyMetric); } @@ -297,7 +301,7 @@ public class TaskMetric implements StateMetric<TaskState.State> { setStreamMetric(SerializationUtils.deserialize(metricValue)); break; case "latencymark": - addLatencymark(SerializationUtils.deserialize(metricValue)); + setLatencyMetric(SerializationUtils.deserialize(metricValue)); break; case "taskDuration": setTaskDuration(SerializationUtils.deserialize(metricValue)); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java index 4bc0e5adb..c756d6ea7 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java @@ -23,7 +23,7 @@ import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; import org.apache.nemo.common.partitioner.DedicatedKeyPerElement; import org.apache.nemo.common.partitioner.Partitioner; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.plan.RuntimeEdge; @@ -101,7 +101,7 @@ public final class BlockOutputWriter implements OutputWriter { } @Override - public void writeLatencymark(final Latencymark latencymark) { + public void writeLatencymark(final LatencyMark latencymark) { // do nothing } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java index 42cf62489..3b76b9732 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.OperatorVertex; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +62,7 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> { } @Override - public void emitLatencymark(final Latencymark latencymark) { + public void emitLatencymark(final LatencyMark latencymark) { nextOperatorVertex.getTransform().onLatencymark(latencymark); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java index ad26a142c..e6c15fd73 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.commons.lang.SerializationUtils; import org.apache.nemo.common.coder.DecoderFactory; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +86,7 @@ public final class NemoEventDecoderFactory implements DecoderFactory { (WatermarkWithIndex) SerializationUtils.deserialize(inputStream); return watermarkWithIndex; } else if (dataType == 0x02) { - final Latencymark latencymark = (Latencymark) SerializationUtils.deserialize(inputStream); + final LatencyMark latencymark = (LatencyMark) SerializationUtils.deserialize(inputStream); return latencymark; } else { throw new RuntimeException("Element decoding failure: " + dataType); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java index 5f9636085..d88256a6b 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.commons.lang.SerializationUtils; import org.apache.nemo.common.coder.EncoderFactory; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +72,7 @@ public final class NemoEventEncoderFactory implements EncoderFactory { if (element instanceof WatermarkWithIndex) { outputStream.write(0x01); // this is watermark outputStream.write(SerializationUtils.serialize((Serializable) element)); - } else if (element instanceof Latencymark) { + } else if (element instanceof LatencyMark) { outputStream.write(0x02); outputStream.write(SerializationUtils.serialize((Serializable) element)); } else { diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java index 319e0796b..05b84b3a2 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java @@ -21,7 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.common.ir.vertex.OperatorVertex; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +134,7 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O } @Override - public void emitLatencymark(final Latencymark latencymark) { + public void emitLatencymark(final LatencyMark latencymark) { if (LOG.isDebugEnabled()) { LOG.debug("{} emits latencymark {}", irVertex.getId(), latencymark); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java index 082d04899..d4ad0cb93 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.OperatorVertex; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ public final class OperatorWatermarkCollector implements OutputCollector { } @Override - public void emitLatencymark(final Latencymark latencymakr) { + public void emitLatencymark(final LatencyMark latencymakr) { throw new IllegalStateException("Should not be called"); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java index 586a0488c..3ff5cde99 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.executor.datatransfer; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import java.util.Optional; @@ -48,7 +48,7 @@ public interface OutputWriter { * * @param latencymark latencymark */ - void writeLatencymark(Latencymark latencymark); + void writeLatencymark(LatencyMark latencymark); /** * @return the total written bytes. diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java index c6c82f0ce..4f12bfbbe 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; import org.apache.nemo.common.partitioner.Partitioner; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.plan.RuntimeEdge; @@ -110,7 +110,7 @@ public final class PipeOutputWriter implements OutputWriter { } @Override - public void writeLatencymark(final Latencymark latencymark) { + public void writeLatencymark(final LatencyMark latencymark) { if (!initialized) { doInitialize(); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java index 7bd9af45f..433f65afa 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java @@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.IRVertex; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.comm.ControlMessage; @@ -104,7 +104,7 @@ public final class RunTimeMessageOutputCollector<O> implements OutputCollector<O } @Override - public void emitLatencymark(final Latencymark latencymark) { + public void emitLatencymark(final LatencyMark latencymark) { // do nothing } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java index 9b393591f..e1999b374 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java @@ -22,7 +22,7 @@ import org.apache.nemo.common.Pair; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.common.punctuation.Finishmark; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.executor.data.DataUtil; import org.apache.nemo.runtime.executor.datatransfer.*; @@ -213,7 +213,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher { } @Override - public void emitLatencymark(final Latencymark latencymark) { + public void emitLatencymark(final LatencyMark latencymark) { throw new IllegalStateException("Should not be called"); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java index d42099e18..a736a1f65 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java @@ -22,7 +22,7 @@ import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.Readable; import org.apache.nemo.common.ir.vertex.SourceVertex; import org.apache.nemo.common.punctuation.Finishmark; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import java.util.concurrent.Executors; @@ -122,7 +122,7 @@ class SourceVertexDataFetcher extends DataFetcher { if (isWatermarkTriggerTime()) { return new Watermark(readable.readWatermark()); } else if (isLatencyMarkTriggered()) { - return new Latencymark(taskId, System.currentTimeMillis()); + return new LatencyMark(taskId, System.currentTimeMillis()); } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java index 7b8f99bc7..e81af3d03 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java @@ -34,7 +34,7 @@ import org.apache.nemo.common.ir.vertex.transform.MessageAggregatorTransform; import org.apache.nemo.common.ir.vertex.transform.SignalTransform; import org.apache.nemo.common.ir.vertex.transform.Transform; import org.apache.nemo.common.punctuation.Finishmark; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.comm.ControlMessage; @@ -386,7 +386,7 @@ public final class TaskExecutor { } private void processLatencymark(final OutputCollector outputCollector, - final Latencymark latencymark) { + final LatencyMark latencymark) { outputCollector.emitLatencymark(latencymark); } @@ -486,13 +486,15 @@ public final class TaskExecutor { serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes(); encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes(); } - } else if (event instanceof Latencymark) { - Latencymark latencymark = (Latencymark) event; + } else if (event instanceof LatencyMark) { + LatencyMark latencymark = (LatencyMark) event; long currTimestamp = System.currentTimeMillis(); // send latencyMetric to RuntimeMaster LatencyMetric metric = new LatencyMetric(latencymark, currTimestamp); - metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", SerializationUtils.serialize(metric)); + if (metric.getLatency() > 0) { + metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", SerializationUtils.serialize(metric)); + } long latestSentTimestamp = latestSentLatencymarkTimestamp.getOrDefault(latencymark.getCreatedTaskId(), -1L); if (latestSentTimestamp < latencymark.getCreatedTimestamp()) { diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java index abadc005b..375ac8097 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java @@ -37,7 +37,7 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex; import org.apache.nemo.common.ir.vertex.SourceVertex; import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; import org.apache.nemo.common.ir.vertex.transform.Transform; -import org.apache.nemo.common.punctuation.Latencymark; +import org.apache.nemo.common.punctuation.LatencyMark; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap; @@ -226,9 +226,9 @@ public final class TaskExecutorTest { final Map<String, Readable> vertexIdToReadable = new HashMap<>(); vertexIdToReadable.put(sourceIRVertex.getId(), readable); final List<Watermark> emittedWatermarks = new LinkedList<>(); - final List<Latencymark> emittedLatencymarks = new LinkedList<>(); + final List<LatencyMark> emittedLatencyMarks = new LinkedList<>(); - final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks); + final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencyMarks); final OperatorVertex operatorVertex = new OperatorVertex(transform); final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = @@ -319,9 +319,9 @@ public final class TaskExecutorTest { @Test() public void testMultipleIncomingEdges() throws Exception { final List<Watermark> emittedWatermarks = new ArrayList<>(); - final List<Latencymark> emittedLatencymarks = new ArrayList<>(); + final List<LatencyMark> emittedLatencyMarks = new ArrayList<>(); final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform()); - final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks)); + final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencyMarks)); final IRVertex operatorIRVertex3 = new OperatorVertex(new StreamTransform()); final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex(); @@ -630,11 +630,11 @@ public final class TaskExecutorTest { private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> { private OutputCollector<T> outputCollector; private final List<Watermark> emittedWatermarks; - private final List<Latencymark> emittedLatencymarks; + private final List<LatencyMark> emittedLatencyMarks; - StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks, final List<Latencymark> emittedLatencymarks) { + StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks, final List<LatencyMark> emittedLatencyMarks) { this.emittedWatermarks = emittedWatermarks; - this.emittedLatencymarks = emittedLatencymarks; + this.emittedLatencyMarks = emittedLatencyMarks; } @Override @@ -648,8 +648,8 @@ public final class TaskExecutorTest { } @Override - public void onLatencymark(Latencymark latencymark) { - emittedLatencymarks.add(latencymark); + public void onLatencymark(LatencyMark latencymark) { + emittedLatencyMarks.add(latencymark); } @Override @@ -775,7 +775,7 @@ public final class TaskExecutorTest { } @Override - public void onLatencymark(Latencymark latencymark) { + public void onLatencymark(LatencyMark latencymark) { outputCollector.emitLatencymark(latencymark); } @@ -811,7 +811,7 @@ public final class TaskExecutorTest { } @Override - public void onLatencymark(Latencymark latencymark) { + public void onLatencymark(LatencyMark latencymark) { // do nothing } @@ -852,7 +852,7 @@ public final class TaskExecutorTest { } @Override - public void onLatencymark(Latencymark latencymark) { + public void onLatencymark(LatencyMark latencymark) { outputCollector.emitLatencymark(latencymark); } @@ -902,7 +902,7 @@ public final class TaskExecutorTest { } @Override - public void onLatencymark(Latencymark latencymark) { + public void onLatencymark(LatencyMark latencymark) { outputCollector.emitLatencymark(latencymark); }