Repository: reef
Updated Branches:
  refs/heads/master 4a847c5c5 -> a541312f7


[REEF-806] Expose a sequence number for Evaluator messages

This patch:
 * Adds the Numberable interface to mark objects that can form
   ordered sequences, e.g. messages
 * Makes DriverSide TaskMessage and ContextMessage interfaces
   extend Numberable
 * In TaskMessageImpl and ContextMessageImpl, exposes the
   timestamp of an evaluator heartbeat as a sequence number via
   Numberable getSequenceNumber()

JIRA:
  [REEF-806](https://issues.apache.org/jira/browse/REEF-806)

Pull Request:
  This closes #737


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a541312f
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a541312f
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a541312f

Branch: refs/heads/master
Commit: a541312f7dc061a46959217b485bae3696e1f937
Parents: 4a847c5
Author: [email protected] <[email protected]>
Authored: Wed Dec 16 11:35:48 2015 +0100
Committer: Markus Weimer <[email protected]>
Committed: Wed Dec 16 12:17:15 2015 -0800

----------------------------------------------------------------------
 .../reef/javabridge/ContextMessageBridge.java   |  8 +++++
 .../reef/driver/context/ContextMessage.java     | 24 +++++++++++++-
 .../apache/reef/driver/task/TaskMessage.java    | 24 +++++++++++++-
 .../java/org/apache/reef/io/Numberable.java     | 35 ++++++++++++++++++++
 .../driver/context/ContextMessageImpl.java      | 11 +++++-
 .../driver/context/ContextRepresenters.java     |  3 +-
 .../driver/evaluator/EvaluatorManager.java      |  8 +++--
 .../evaluator/pojos/ContextMessagePOJO.java     | 17 ++++++++--
 .../evaluator/pojos/ContextStatusPOJO.java      |  4 +--
 .../driver/evaluator/pojos/TaskMessagePOJO.java | 18 ++++++++--
 .../driver/evaluator/pojos/TaskStatusPOJO.java  |  4 +--
 .../common/driver/task/TaskMessageImpl.java     | 11 +++++-
 .../common/driver/task/TaskRepresenter.java     |  6 ++--
 13 files changed, 154 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
index 9cbdc86..ce473a8 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.reef.annotations.audience.Interop;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.ContextMessage;
@@ -62,4 +63,11 @@ public final class ContextMessageBridge extends NativeBridge 
implements ContextM
   public String getMessageSourceID() {
     return messageSourceId;
   }
+
+  @Override
+  public long getSequenceNumber(){
+    // TODO[REEF-1085] once REEF.NET supports sequence numbers, ensure the 
numbers
+    // can propagate between C# and Java implementations
+    throw new NotImplementedException("A Java-CLR bridge lacks support of 
sequence numbers on the REEF.NET side.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java
index b3be7c3..a5912cd 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.Provided;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.io.Message;
+import org.apache.reef.io.Numberable;
 import org.apache.reef.io.naming.Identifiable;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.reef.io.naming.Identifiable;
 @Public
 @DriverSide
 @Provided
-public interface ContextMessage extends Message, Identifiable {
+public interface ContextMessage extends Message, Identifiable, Numberable {
 
   /**
    * @return the message sent by the Context.
@@ -49,4 +50,25 @@ public interface ContextMessage extends Message, 
Identifiable {
    */
   String getMessageSourceID();
 
+  /**
+   * @return the sequence number of the message
+   *
+   * Terminology: a source sends a message to a target.
+   * Example sources are Tasks or Contexts. Example targets are Drivers.
+   *
+   * The method guarantees that sequence numbers increase strictly 
monotonically per message source.
+   * So numbers of messages from different sources should not be compared to 
each other.
+   *
+   * Clients of this method must not assume any particular method 
implementation
+   * because it may change in the future.
+   *
+   * The current implementation returns the timestamp of a heartbeat that 
contained the message.
+   * A heartbeat may contain several messages; all such message will get the 
same sequence number.
+   * A source can attach only one message to a single heartbeat, so the strict 
sequence number monotonicity
+   * per source is guaranteed.
+   *
+   */
+  @Override
+  long getSequenceNumber();
+
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java
index ef2e781..b59cc13 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.Provided;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.io.Message;
+import org.apache.reef.io.Numberable;
 import org.apache.reef.io.naming.Identifiable;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.reef.io.naming.Identifiable;
 @DriverSide
 @Public
 @Provided
-public interface TaskMessage extends Message, Identifiable {
+public interface TaskMessage extends Message, Identifiable, Numberable {
 
   /**
    * @return the message.
@@ -45,6 +46,27 @@ public interface TaskMessage extends Message, Identifiable {
   String getId();
 
   /**
+   * @return the sequence number of the message
+   *
+   * Terminology: a source sends a message to a target.
+   * Example sources are Tasks or Contexts. Example targets are Drivers.
+   *
+   * The method guarantees that sequence numbers increase strictly 
monotonically per message source.
+   * So numbers of messages from different sources should not be compared to 
each other.
+   *
+   * Clients of this method must not assume any particular method 
implementation
+   * because it may change in the future.
+   *
+   * The current implementation returns the timestamp of a heartbeat that 
contained the message.
+   * A heartbeat may contain several messages; all such message will get the 
same sequence number.
+   * A source can attach only one message to a single heartbeat, so the strict 
sequence number monotonicity
+   * per source is guaranteed.
+   *
+   */
+  @Override
+  long getSequenceNumber();
+
+  /**
    * @return the id of the context the sending task is running in.
    */
   String getContextId();

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java 
b/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java
new file mode 100644
index 0000000..de84217
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io;
+
+/**
+ * Interface for objects which can form ordered sequences, e.g. {@link 
Message}.
+ */
+public interface Numberable {
+
+  /**
+   *
+   * @return the number of an object in a ordered sequences of similar objects
+   *
+   * Guarantees of this method - such as partial or total order - are 
implementation-specific.
+   */
+  long getSequenceNumber();
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
index 4c79972..89b574e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
@@ -28,14 +28,18 @@ import org.apache.reef.driver.context.ContextMessage;
 @Private
 @DriverSide
 public final class ContextMessageImpl implements ContextMessage {
+
   private final byte[] theMessage;
   private final String theContextID;
   private final String theMessageSourceId;
+  private final long sequenceNumber;
 
-  public ContextMessageImpl(final byte[] theMessage, final String 
theContextID, final String theMessageSourceId) {
+  public ContextMessageImpl(final byte[] theMessage, final String 
theContextID, final String theMessageSourceId,
+                            final long sequenceNumber) {
     this.theMessage = theMessage;
     this.theContextID = theContextID;
     this.theMessageSourceId = theMessageSourceId;
+    this.sequenceNumber = sequenceNumber;
   }
 
   @Override
@@ -52,4 +56,9 @@ public final class ContextMessageImpl implements 
ContextMessage {
   public String getMessageSourceID() {
     return this.theMessageSourceId;
   }
+
+  @Override
+  public long getSequenceNumber() {
+    return this.sequenceNumber;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index 5a117f9..6892a0c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -198,7 +198,8 @@ public final class ContextRepresenters {
              contextMessage : contextStatus.getContextMessageList()) {
       final byte[] theMessage = contextMessage.getMessage();
       final String sourceID = contextMessage.getSourceId();
-      this.messageDispatcher.onContextMessage(new 
ContextMessageImpl(theMessage, contextID, sourceID));
+      final long sequenceNumber = contextMessage.getSequenceNumber();
+      this.messageDispatcher.onContextMessage(new 
ContextMessageImpl(theMessage, contextID, sourceID, sequenceNumber));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index cc107c3..fcefb6b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -374,6 +374,9 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         }
       }
 
+      // All messages from a heartbeat receive the heartbeat timestamp as a 
sequence number. See REEF-806.
+      final long messageSequenceNumber = 
evaluatorHeartbeatProto.getTimestamp();
+
       // Process the Evaluator status message
       if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
         EvaluatorStatusPOJO evaluatorStatus = new 
EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
@@ -384,16 +387,15 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
       final boolean informClientOfNewContexts = 
!evaluatorHeartbeatProto.hasTaskStatus();
       final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
       for (ReefServiceProtos.ContextStatusProto proto : 
evaluatorHeartbeatProto.getContextStatusList()) {
-        contextStatusList.add(new ContextStatusPOJO(proto));
+        contextStatusList.add(new ContextStatusPOJO(proto, 
messageSequenceNumber));
       }
 
       this.contextRepresenters.onContextStatusMessages(contextStatusList,
           informClientOfNewContexts);
 
       // Process the Task status message
-
       if (evaluatorHeartbeatProto.hasTaskStatus()) {
-        TaskStatusPOJO taskStatus = new 
TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus());
+        TaskStatusPOJO taskStatus = new 
TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber);
         this.onTaskStatusMessage(taskStatus);
       }
       LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", 
this.getId());

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
index 3760719..c5aa0ea 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.Numberable;
 import org.apache.reef.proto.ReefServiceProtos;
 
 /**
@@ -28,15 +29,17 @@ import org.apache.reef.proto.ReefServiceProtos;
  */
 @DriverSide
 @Private
-public final class ContextMessagePOJO {
+public final class ContextMessagePOJO implements Numberable {
 
   private final byte[] message;
   private final String sourceId;
+  private final long sequenceNumber;
 
 
-  ContextMessagePOJO(final 
ReefServiceProtos.ContextStatusProto.ContextMessageProto proto){
+  ContextMessagePOJO(final 
ReefServiceProtos.ContextStatusProto.ContextMessageProto proto, final long 
sequenceNumber){
     message = proto.getMessage().toByteArray();
     sourceId = proto.getSourceId();
+    this.sequenceNumber = sequenceNumber;
   }
 
   /**
@@ -52,4 +55,14 @@ public final class ContextMessagePOJO {
   public String getSourceId(){
     return sourceId;
   }
+
+  /**
+   * @return the sequence number of a message
+   *
+   * {@see ContextMessage#getSequenceNumber()}
+   */
+  @Override
+  public long getSequenceNumber(){
+    return this.sequenceNumber;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
index a636ea5..99bf4a1 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java
@@ -41,7 +41,7 @@ public final class ContextStatusPOJO {
   private final List<ContextMessagePOJO> contextMessages = new ArrayList<>();
 
 
-  public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto){
+  public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto, 
final long sequenceNumber){
 
     contextId = proto.getContextId();
     parentId = proto.hasParentId() ? proto.getParentId() : null;
@@ -49,7 +49,7 @@ public final class ContextStatusPOJO {
     contextState = proto.hasContextState()? 
getContextStateFromProto(proto.getContextState()) : null;
 
     for (final ContextMessageProto contextMessageProto : 
proto.getContextMessageList()) {
-      contextMessages.add(new ContextMessagePOJO(contextMessageProto));
+      contextMessages.add(new ContextMessagePOJO(contextMessageProto,  
sequenceNumber));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
index 84bb4d6..be9aa71 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.Numberable;
 import org.apache.reef.proto.ReefServiceProtos;
 
 /**
@@ -28,14 +29,16 @@ import org.apache.reef.proto.ReefServiceProtos;
  */
 @DriverSide
 @Private
-public final class TaskMessagePOJO {
+public final class TaskMessagePOJO implements Numberable {
 
   private final byte[] message;
   private final String sourceId;
+  private final long sequenceNumber;
 
-  TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto 
proto){
+  TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto 
proto, final long sequenceNumber) {
     message = proto.getMessage().toByteArray();
     sourceId = proto.getSourceId();
+    this.sequenceNumber = sequenceNumber;
   }
 
   /**
@@ -51,4 +54,15 @@ public final class TaskMessagePOJO {
   public String getSourceId(){
     return sourceId;
   }
+
+  /**
+   * @return the sequence number of a message
+   *
+   * {@see TaskMessage#getSequenceNumber()}
+   */
+  @Override
+  public long getSequenceNumber(){
+    return this.sequenceNumber;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
index 66ecdda..f2e3f2d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
@@ -40,7 +40,7 @@ public final class TaskStatusPOJO {
   private final byte[] result;
   private final List<TaskMessagePOJO> taskMessages = new ArrayList<>();
 
-  public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto){
+  public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final 
long sequenceNumber){
 
     taskId = proto.getTaskId();
     contextId = proto.getContextId();
@@ -48,7 +48,7 @@ public final class TaskStatusPOJO {
     result = proto.hasResult() ? proto.getResult().toByteArray() : null;
 
     for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) 
{
-      taskMessages.add(new TaskMessagePOJO(taskMessageProto));
+      taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
index 57ca425..7cfa2d9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
@@ -33,13 +33,16 @@ public final class TaskMessageImpl implements TaskMessage {
   private final String taskId;
   private final String contextId;
   private final String theMessageSourceId;
+  private final long   sequenceNumber;
 
   public TaskMessageImpl(final byte[] theMessage, final String taskId,
-                         final String contextId, final String 
theMessageSourceId) {
+                         final String contextId, final String 
theMessageSourceId,
+                         final long sequenceNumber) {
     this.theMessage = theMessage;
     this.taskId = taskId;
     this.contextId = contextId;
     this.theMessageSourceId = theMessageSourceId;
+    this.sequenceNumber = sequenceNumber;
   }
 
   @Override
@@ -61,4 +64,10 @@ public final class TaskMessageImpl implements TaskMessage {
   public String getMessageSourceID() {
     return this.theMessageSourceId;
   }
+
+  @Override
+  public long getSequenceNumber() {
+    return this.sequenceNumber;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 89d60a3..fc3d6e5 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -149,10 +149,10 @@ public final class TaskRepresenter {
     }
 
     for (final TaskMessagePOJO
-             taskMessageProto : taskStatus.getTaskMessageList()) {
+             taskMessagePOJO : taskStatus.getTaskMessageList()) {
       this.messageDispatcher.onTaskMessage(
-          new TaskMessageImpl(taskMessageProto.getMessage(),
-              this.taskId, this.context.getId(), 
taskMessageProto.getSourceId()));
+          new TaskMessageImpl(taskMessagePOJO.getMessage(),
+              this.taskId, this.context.getId(), 
taskMessagePOJO.getSourceId(), taskMessagePOJO.getSequenceNumber()));
     }
   }
 

Reply via email to