This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d36733260 [MINOR] Expose Latency metrics (#324)
d36733260 is described below

commit d36733260fc2f245e0516e89c3c25c573db0f058
Author: Won Wook SONG <wsong0...@gmail.com>
AuthorDate: Mon Aug 1 15:24:16 2022 +0900

    [MINOR] Expose Latency metrics (#324)
    
    **Major changes:**
    - Fix for exposing latency metrics on the metrics log
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    - Existing tests pass.
    
    **Other comments:**
    -
    
    Closes #324
---
 .../org/apache/nemo/common/ir/OutputCollector.java |  4 +--
 .../vertex/transform/LatencymarkEmitTransform.java |  6 ++--
 .../vertex/transform/NoWatermarkEmitTransform.java |  4 +--
 .../nemo/common/ir/vertex/transform/Transform.java |  4 +--
 .../{Latencymark.java => LatencyMark.java}         | 34 ++++++++++++++--------
 .../beam/transform/AbstractDoFnTransform.java      |  4 +--
 .../frontend/beam/transform/GBKTransform.java      |  4 +--
 .../beam/transform/TestOutputCollector.java        | 10 +++----
 .../nemo/runtime/common/metric/LatencyMetric.java  | 17 ++++++++---
 .../nemo/runtime/common/metric/TaskMetric.java     |  8 +++--
 .../executor/datatransfer/BlockOutputWriter.java   |  4 +--
 .../datatransfer/DataFetcherOutputCollector.java   |  4 +--
 .../datatransfer/NemoEventDecoderFactory.java      |  4 +--
 .../datatransfer/NemoEventEncoderFactory.java      |  4 +--
 .../OperatorVertexOutputCollector.java             |  4 +--
 .../datatransfer/OperatorWatermarkCollector.java   |  4 +--
 .../executor/datatransfer/OutputWriter.java        |  4 +--
 .../executor/datatransfer/PipeOutputWriter.java    |  4 +--
 .../RunTimeMessageOutputCollector.java             |  4 +--
 .../task/MultiThreadParentTaskDataFetcher.java     |  4 +--
 .../executor/task/SourceVertexDataFetcher.java     |  4 +--
 .../nemo/runtime/executor/task/TaskExecutor.java   | 12 ++++----
 .../runtime/executor/task/TaskExecutorTest.java    | 28 +++++++++---------
 23 files changed, 102 insertions(+), 77 deletions(-)

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java 
b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
index 916a06f3f..e90de4bee 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.common.ir;
 
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.io.Serializable;
@@ -50,7 +50,7 @@ public interface OutputCollector<O> extends Serializable {
    *
    * @param latencymark latencymark
    */
-  void emitLatencymark(Latencymark latencymark);
+  void emitLatencymark(LatencyMark latencymark);
 
   /**
    * Multi-destination emit.
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
index 44640a144..eaa60b2ba 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
@@ -19,10 +19,10 @@
 package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 
 /**
- * This transform emits {@link Latencymark}.
+ * This transform emits {@link LatencyMark}.
  *
  * @param <I> input type
  * @param <O> output type
@@ -50,7 +50,7 @@ public abstract class LatencymarkEmitTransform<I, O> 
implements Transform<I, O>
    * @param latencymark latencymark
    */
   @Override
-  public final void onLatencymark(final Latencymark latencymark) {
+  public final void onLatencymark(final LatencyMark latencymark) {
     outputCollector.emitLatencymark(latencymark);
   }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
index 7f8b9d1e1..029ba8337 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.common.ir.vertex.transform;
 
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 /**
@@ -39,7 +39,7 @@ public abstract class NoWatermarkEmitTransform<I, O> 
implements Transform<I, O>
   }
 
   @Override
-  public final void onLatencymark(final Latencymark latencymark) {
+  public final void onLatencymark(final LatencyMark latencymark) {
     // do nothing
   }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index d16ab5c24..dd00d82af 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.io.Serializable;
@@ -63,7 +63,7 @@ public interface Transform<I, O> extends Serializable {
    *
    * @param latencymark latencymark.
    */
-  void onLatencymark(Latencymark latencymark);
+  void onLatencymark(LatencyMark latencymark);
 
   /**
    * Close the transform.
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java
similarity index 63%
rename from 
common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
rename to 
common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java
index a331e1932..56f26c348 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/LatencyMark.java
@@ -22,10 +22,13 @@ import java.io.Serializable;
 import java.util.Objects;
 
 /**
- * Latency mark is conveyor that has data for debugging.
- * It is created only from source vertex and record the timestamp when it is 
created and taskId where it is created.
+ * Latency mark is a watermark with the data related to stream data latencies.
+ * It is created only from source vertex, with the data of when (timestamp) 
and where (taskId) it was created.
+ * Later tasks can infer the latency according to the time that the 
latencyMark arrives to the task.
+ * When the latencyMark arrives in a task, it leaves its record with its task 
id and timestamp, and
+ * later tasks can track the itinerary by looking at the recorded previous 
task id and timestamp.
  */
-public final class Latencymark implements Serializable {
+public final class LatencyMark implements Serializable {
   private final String createdTaskId;
   private final long createdTimestamp;
   private String previousTaskId;
@@ -33,10 +36,10 @@ public final class Latencymark implements Serializable {
 
 
   /**
-   * @param taskId task id where it is created
-   * @param timestamp timestamp when it is created
+   * @param taskId task id of where it was created
+   * @param timestamp timestamp of when it was created
    */
-  public Latencymark(final String taskId, final long timestamp) {
+  public LatencyMark(final String taskId, final long timestamp) {
     this.createdTaskId = taskId;
     this.createdTimestamp = timestamp;
     this.previousTaskId = "";
@@ -44,30 +47,37 @@ public final class Latencymark implements Serializable {
   }
 
   /**
-   * @return the latencymark timestamp
+   * @return the timestamp of when this latencyMark was first created.
    */
   public long getCreatedTimestamp() {
     return createdTimestamp;
   }
 
   /**
-   * @return the task id where it is created
+   * @return the task id of where it was created.
    */
   public String getCreatedTaskId() {
     return createdTaskId;
   }
 
   /**
-   * @return the task id of previous task
+   * @return the task id of task that this latency mark was previously passed 
on by.
    */
   public String getPreviousTaskId() {
     return previousTaskId;
   }
 
+  /**
+   * @return the time stamp when this latency mark was previously passed on by 
a task.
+   */
+  public long getPreviousSentTimestamp() {
+    return previousSentTimestamp;
+  }
+
   /**
    * Set the previousTaskId.
    *
-   * @param taskId the task id.
+   * @param taskId the task id of where this latencyMark has gone through 
previously.
    */
   public void setPreviousTaskId(final String taskId) {
     previousTaskId = taskId;
@@ -76,7 +86,7 @@ public final class Latencymark implements Serializable {
   /**
    * Set the previousSentTimestamp.
    *
-   * @param timestamp the timestamp.
+   * @param timestamp the timestamp of when this latencyMark was sent from a 
previous task.
    */
   public void setPreviousSentTimestamp(final long timestamp) {
     previousSentTimestamp = timestamp;
@@ -90,7 +100,7 @@ public final class Latencymark implements Serializable {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    final Latencymark latencymark = (Latencymark) o;
+    final LatencyMark latencymark = (LatencyMark) o;
     return (createdTimestamp == latencymark.createdTimestamp)
       && (createdTaskId.equals(latencymark.createdTaskId)
       && (previousTaskId.equals(latencymark.previousTaskId)));
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 1cd68999b..84d9b450f 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.slf4j.Logger;
@@ -319,7 +319,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, 
OutputT> implements
    * @param latencymark latencymark.
    */
   @Override
-  public void onLatencymark(final Latencymark latencymark) {
+  public void onLatencymark(final LatencyMark latencymark) {
     getOutputCollector().emitLatencymark(latencymark);
   }
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 9194edbbc..d07b504a9 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -302,7 +302,7 @@ public final class GBKTransform<K, InputT, OutputT>
 
     /** Emit latencymark. */
     @Override
-    public final void emitLatencymark(final Latencymark latencymark) {
+    public final void emitLatencymark(final LatencyMark latencymark) {
       oc.emitLatencymark(latencymark);
     }
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
index 914ccf50c..8f49ec678 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
@@ -20,7 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.reef.io.Tuple;
 
@@ -36,13 +36,13 @@ final class TestOutputCollector<T> implements 
OutputCollector<WindowedValue<T>>
   public final List<WindowedValue<T>> outputs;
   public final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
   public final List<Watermark> watermarks;
-  public final List<Latencymark> latencymarks;
+  public final List<LatencyMark> latencyMarks;
 
   TestOutputCollector() {
     this.outputs = new LinkedList<>();
     this.taggedOutputs = new LinkedList<>();
     this.watermarks = new LinkedList<>();
-    this.latencymarks = new LinkedList<>();
+    this.latencyMarks = new LinkedList<>();
   }
 
   @Override
@@ -56,8 +56,8 @@ final class TestOutputCollector<T> implements 
OutputCollector<WindowedValue<T>>
   }
 
   @Override
-  public void emitLatencymark(Latencymark latencymark) {
-    latencymarks.add(latencymark);
+  public void emitLatencymark(LatencyMark latencymark) {
+    latencyMarks.add(latencymark);
   }
 
   @Override
diff --git 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
index a5fb93e6f..ea4dd1ea7 100644
--- 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
+++ 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.common.metric;
 
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 
 import java.io.Serializable;
 
@@ -27,8 +27,9 @@ import java.io.Serializable;
  * The traversal time can be calculated by comparing the time when the 
latencymark was created with the time recorded.
  */
 public final class LatencyMetric implements Serializable {
-  private final Latencymark latencymark;
+  private final LatencyMark latencymark;
   private final long timestamp;
+  private final long latency;
 
   /**
    * Constructor with the latencymark and timestamp.
@@ -36,9 +37,10 @@ public final class LatencyMetric implements Serializable {
    * @param latencymark the latencymark to record.
    * @param timestamp When the latencymark was received.
    */
-  public LatencyMetric(final Latencymark latencymark, final long timestamp) {
+  public LatencyMetric(final LatencyMark latencymark, final long timestamp) {
     this.latencymark = latencymark;
     this.timestamp = timestamp;
+    this.latency = timestamp - latencymark.getCreatedTimestamp();
   }
 
   /**
@@ -46,7 +48,7 @@ public final class LatencyMetric implements Serializable {
    *
    * @return latency mark.
    */
-  public Latencymark getLatencymark() {
+  public LatencyMark getLatencymark() {
     return latencymark;
   }
 
@@ -58,4 +60,11 @@ public final class LatencyMetric implements Serializable {
   public long getTimestamp() {
     return this.timestamp;
   }
+
+  /**
+   * @return the latency.
+   */
+  public long getLatency() {
+    return latency;
+  }
 }
diff --git 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
index 552153036..f1b642526 100644
--- 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
+++ 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
@@ -133,7 +133,11 @@ public class TaskMetric implements 
StateMetric<TaskState.State> {
     }
   }
 
-  private void addLatencymark(final LatencyMetric latencyMetric) {
+  public final Map<String, List<LatencyMetric>> getLatencyMetric() {
+    return latencymarks;
+  }
+
+  private void setLatencyMetric(final LatencyMetric latencyMetric) {
     
latencymarks.putIfAbsent(latencyMetric.getLatencymark().getPreviousTaskId(), 
new LinkedList<>());
     
latencymarks.get(latencyMetric.getLatencymark().getPreviousTaskId()).add(latencyMetric);
   }
@@ -297,7 +301,7 @@ public class TaskMetric implements 
StateMetric<TaskState.State> {
         setStreamMetric(SerializationUtils.deserialize(metricValue));
         break;
       case "latencymark":
-        addLatencymark(SerializationUtils.deserialize(metricValue));
+        setLatencyMetric(SerializationUtils.deserialize(metricValue));
         break;
       case "taskDuration":
         setTaskDuration(SerializationUtils.deserialize(metricValue));
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 4bc0e5adb..c756d6ea7 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -23,7 +23,7 @@ import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.partitioner.DedicatedKeyPerElement;
 import org.apache.nemo.common.partitioner.Partitioner;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
@@ -101,7 +101,7 @@ public final class BlockOutputWriter implements 
OutputWriter {
   }
 
   @Override
-  public void writeLatencymark(final Latencymark latencymark) {
+  public void writeLatencymark(final LatencyMark latencymark) {
     // do nothing
   }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 42cf62489..3b76b9732 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +62,7 @@ public final class DataFetcherOutputCollector<O> implements 
OutputCollector<O> {
   }
 
   @Override
-  public void emitLatencymark(final Latencymark latencymark) {
+  public void emitLatencymark(final LatencyMark latencymark) {
     nextOperatorVertex.getTransform().onLatencymark(latencymark);
   }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
index ad26a142c..e6c15fd73 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.nemo.common.coder.DecoderFactory;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,7 +86,7 @@ public final class NemoEventDecoderFactory implements 
DecoderFactory {
           (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
         return watermarkWithIndex;
       } else if (dataType == 0x02) {
-        final Latencymark latencymark = (Latencymark) 
SerializationUtils.deserialize(inputStream);
+        final LatencyMark latencymark = (LatencyMark) 
SerializationUtils.deserialize(inputStream);
         return latencymark;
       } else {
         throw new RuntimeException("Element decoding failure: " + dataType);
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
index 5f9636085..d88256a6b 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.nemo.common.coder.EncoderFactory;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +72,7 @@ public final class NemoEventEncoderFactory implements 
EncoderFactory {
       if (element instanceof WatermarkWithIndex) {
         outputStream.write(0x01); // this is watermark
         outputStream.write(SerializationUtils.serialize((Serializable) 
element));
-      } else if (element instanceof Latencymark) {
+      } else if (element instanceof LatencyMark) {
         outputStream.write(0x02);
         outputStream.write(SerializationUtils.serialize((Serializable) 
element));
       } else {
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 319e0796b..05b84b3a2 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -21,7 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,7 +134,7 @@ public final class OperatorVertexOutputCollector<O> 
implements OutputCollector<O
   }
 
   @Override
-  public void emitLatencymark(final Latencymark latencymark) {
+  public void emitLatencymark(final LatencyMark latencymark) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("{} emits latencymark {}", irVertex.getId(), latencymark);
     }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
index 082d04899..d4ad0cb93 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public final class OperatorWatermarkCollector implements 
OutputCollector {
   }
 
   @Override
-  public void emitLatencymark(final Latencymark latencymakr) {
+  public void emitLatencymark(final LatencyMark latencymakr) {
     throw new IllegalStateException("Should not be called");
   }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 586a0488c..3ff5cde99 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.Optional;
@@ -48,7 +48,7 @@ public interface OutputWriter {
    *
    * @param latencymark latencymark
    */
-  void writeLatencymark(Latencymark latencymark);
+  void writeLatencymark(LatencyMark latencymark);
 
   /**
    * @return the total written bytes.
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index c6c82f0ce..4f12bfbbe 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import 
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.partitioner.Partitioner;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
@@ -110,7 +110,7 @@ public final class PipeOutputWriter implements OutputWriter 
{
   }
 
   @Override
-  public void writeLatencymark(final Latencymark latencymark) {
+  public void writeLatencymark(final LatencyMark latencymark) {
     if (!initialized) {
       doInitialize();
     }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
index 7bd9af45f..433f65afa 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
@@ -20,7 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -104,7 +104,7 @@ public final class RunTimeMessageOutputCollector<O> 
implements OutputCollector<O
   }
 
   @Override
-  public void emitLatencymark(final Latencymark latencymark) {
+  public void emitLatencymark(final LatencyMark latencymark) {
     // do nothing
   }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index 9b393591f..e1999b374 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -22,7 +22,7 @@ import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.*;
@@ -213,7 +213,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
     }
 
     @Override
-    public void emitLatencymark(final Latencymark latencymark) {
+    public void emitLatencymark(final LatencyMark latencymark) {
       throw new IllegalStateException("Should not be called");
     }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index d42099e18..a736a1f65 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -22,7 +22,7 @@ import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.concurrent.Executors;
@@ -122,7 +122,7 @@ class SourceVertexDataFetcher extends DataFetcher {
       if (isWatermarkTriggerTime()) {
         return new Watermark(readable.readWatermark());
       } else if (isLatencyMarkTriggered()) {
-        return new Latencymark(taskId, System.currentTimeMillis());
+        return new LatencyMark(taskId, System.currentTimeMillis());
       }
     }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 7b8f99bc7..e81af3d03 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -34,7 +34,7 @@ import 
org.apache.nemo.common.ir.vertex.transform.MessageAggregatorTransform;
 import org.apache.nemo.common.ir.vertex.transform.SignalTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Finishmark;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -386,7 +386,7 @@ public final class TaskExecutor {
   }
 
   private void processLatencymark(final OutputCollector outputCollector,
-                                final Latencymark latencymark) {
+                                final LatencyMark latencymark) {
     outputCollector.emitLatencymark(latencymark);
   }
 
@@ -486,13 +486,15 @@ public final class TaskExecutor {
         serializedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
         encodedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
       }
-    } else if (event instanceof Latencymark) {
-      Latencymark latencymark = (Latencymark) event;
+    } else if (event instanceof LatencyMark) {
+      LatencyMark latencymark = (LatencyMark) event;
       long currTimestamp = System.currentTimeMillis();
 
       // send latencyMetric to RuntimeMaster
       LatencyMetric metric = new LatencyMetric(latencymark, currTimestamp);
-      metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", 
SerializationUtils.serialize(metric));
+      if (metric.getLatency() > 0) {
+        metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", 
SerializationUtils.serialize(metric));
+      }
 
       long latestSentTimestamp = 
latestSentLatencymarkTimestamp.getOrDefault(latencymark.getCreatedTaskId(), 
-1L);
       if (latestSentTimestamp < latencymark.getCreatedTimestamp()) {
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index abadc005b..375ac8097 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -37,7 +37,7 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.common.punctuation.Latencymark;
+import org.apache.nemo.common.punctuation.LatencyMark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -226,9 +226,9 @@ public final class TaskExecutorTest {
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
     final List<Watermark> emittedWatermarks = new LinkedList<>();
-    final List<Latencymark> emittedLatencymarks = new LinkedList<>();
+    final List<LatencyMark> emittedLatencyMarks = new LinkedList<>();
 
-    final Transform transform = new 
StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks);
+    final Transform transform = new 
StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencyMarks);
     final OperatorVertex operatorVertex = new OperatorVertex(transform);
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
@@ -319,9 +319,9 @@ public final class TaskExecutorTest {
   @Test()
   public void testMultipleIncomingEdges() throws Exception {
     final List<Watermark> emittedWatermarks = new ArrayList<>();
-    final List<Latencymark> emittedLatencymarks = new ArrayList<>();
+    final List<LatencyMark> emittedLatencyMarks = new ArrayList<>();
     final IRVertex operatorIRVertex1 = new OperatorVertex(new 
StreamTransform());
-    final IRVertex operatorIRVertex2 = new OperatorVertex(new 
StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks));
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new 
StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencyMarks));
     final IRVertex operatorIRVertex3 = new OperatorVertex(new 
StreamTransform());
 
     final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
@@ -630,11 +630,11 @@ public final class TaskExecutorTest {
   private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> {
     private OutputCollector<T> outputCollector;
     private final List<Watermark> emittedWatermarks;
-    private final List<Latencymark> emittedLatencymarks;
+    private final List<LatencyMark> emittedLatencyMarks;
 
-    StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks, 
final List<Latencymark> emittedLatencymarks) {
+    StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks, 
final List<LatencyMark> emittedLatencyMarks) {
       this.emittedWatermarks = emittedWatermarks;
-      this.emittedLatencymarks = emittedLatencymarks;
+      this.emittedLatencyMarks = emittedLatencyMarks;
     }
 
     @Override
@@ -648,8 +648,8 @@ public final class TaskExecutorTest {
     }
 
     @Override
-    public void onLatencymark(Latencymark latencymark) {
-      emittedLatencymarks.add(latencymark);
+    public void onLatencymark(LatencyMark latencymark) {
+      emittedLatencyMarks.add(latencymark);
     }
 
     @Override
@@ -775,7 +775,7 @@ public final class TaskExecutorTest {
     }
 
     @Override
-    public void onLatencymark(Latencymark latencymark) {
+    public void onLatencymark(LatencyMark latencymark) {
       outputCollector.emitLatencymark(latencymark);
     }
 
@@ -811,7 +811,7 @@ public final class TaskExecutorTest {
     }
 
     @Override
-    public void onLatencymark(Latencymark latencymark) {
+    public void onLatencymark(LatencyMark latencymark) {
       // do nothing
     }
 
@@ -852,7 +852,7 @@ public final class TaskExecutorTest {
     }
 
     @Override
-    public void onLatencymark(Latencymark latencymark) {
+    public void onLatencymark(LatencyMark latencymark) {
       outputCollector.emitLatencymark(latencymark);
     }
 
@@ -902,7 +902,7 @@ public final class TaskExecutorTest {
     }
 
     @Override
-    public void onLatencymark(Latencymark latencymark) {
+    public void onLatencymark(LatencyMark latencymark) {
       outputCollector.emitLatencymark(latencymark);
     }
 

Reply via email to