Repository: reef Updated Branches: refs/heads/master 4a847c5c5 -> a541312f7
[REEF-806] Expose a sequence number for Evaluator messages This patch: * Adds the Numberable interface to mark objects that can form ordered sequences, e.g. messages * Makes DriverSide TaskMessage and ContextMessage interfaces extend Numberable * In TaskMessageImpl and ContextMessageImpl, exposes the timestamp of an evaluator heartbeat as a sequence number via Numberable getSequenceNumber() JIRA: [REEF-806](https://issues.apache.org/jira/browse/REEF-806) Pull Request: This closes #737 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a541312f Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a541312f Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a541312f Branch: refs/heads/master Commit: a541312f7dc061a46959217b485bae3696e1f937 Parents: 4a847c5 Author: [email protected] <[email protected]> Authored: Wed Dec 16 11:35:48 2015 +0100 Committer: Markus Weimer <[email protected]> Committed: Wed Dec 16 12:17:15 2015 -0800 ---------------------------------------------------------------------- .../reef/javabridge/ContextMessageBridge.java | 8 +++++ .../reef/driver/context/ContextMessage.java | 24 +++++++++++++- .../apache/reef/driver/task/TaskMessage.java | 24 +++++++++++++- .../java/org/apache/reef/io/Numberable.java | 35 ++++++++++++++++++++ .../driver/context/ContextMessageImpl.java | 11 +++++- .../driver/context/ContextRepresenters.java | 3 +- .../driver/evaluator/EvaluatorManager.java | 8 +++-- .../evaluator/pojos/ContextMessagePOJO.java | 17 ++++++++-- .../evaluator/pojos/ContextStatusPOJO.java | 4 +-- .../driver/evaluator/pojos/TaskMessagePOJO.java | 18 ++++++++-- .../driver/evaluator/pojos/TaskStatusPOJO.java | 4 +-- .../common/driver/task/TaskMessageImpl.java | 11 +++++- .../common/driver/task/TaskRepresenter.java | 6 ++-- 13 files changed, 154 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java index 9cbdc86..ce473a8 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java @@ -18,6 +18,7 @@ */ package org.apache.reef.javabridge; +import org.apache.commons.lang.NotImplementedException; import org.apache.reef.annotations.audience.Interop; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.ContextMessage; @@ -62,4 +63,11 @@ public final class ContextMessageBridge extends NativeBridge implements ContextM public String getMessageSourceID() { return messageSourceId; } + + @Override + public long getSequenceNumber(){ + // TODO[REEF-1085] once REEF.NET supports sequence numbers, ensure the numbers + // can propagate between C# and Java implementations + throw new NotImplementedException("A Java-CLR bridge lacks support of sequence numbers on the REEF.NET side."); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java index b3be7c3..a5912cd 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/context/ContextMessage.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.Provided; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Public; import org.apache.reef.io.Message; +import org.apache.reef.io.Numberable; import org.apache.reef.io.naming.Identifiable; /** @@ -30,7 +31,7 @@ import org.apache.reef.io.naming.Identifiable; @Public @DriverSide @Provided -public interface ContextMessage extends Message, Identifiable { +public interface ContextMessage extends Message, Identifiable, Numberable { /** * @return the message sent by the Context. @@ -49,4 +50,25 @@ public interface ContextMessage extends Message, Identifiable { */ String getMessageSourceID(); + /** + * @return the sequence number of the message + * + * Terminology: a source sends a message to a target. + * Example sources are Tasks or Contexts. Example targets are Drivers. + * + * The method guarantees that sequence numbers increase strictly monotonically per message source. + * So numbers of messages from different sources should not be compared to each other. + * + * Clients of this method must not assume any particular method implementation + * because it may change in the future. + * + * The current implementation returns the timestamp of a heartbeat that contained the message. + * A heartbeat may contain several messages; all such message will get the same sequence number. + * A source can attach only one message to a single heartbeat, so the strict sequence number monotonicity + * per source is guaranteed. + * + */ + @Override + long getSequenceNumber(); + } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java index ef2e781..b59cc13 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/task/TaskMessage.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.Provided; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Public; import org.apache.reef.io.Message; +import org.apache.reef.io.Numberable; import org.apache.reef.io.naming.Identifiable; /** @@ -30,7 +31,7 @@ import org.apache.reef.io.naming.Identifiable; @DriverSide @Public @Provided -public interface TaskMessage extends Message, Identifiable { +public interface TaskMessage extends Message, Identifiable, Numberable { /** * @return the message. @@ -45,6 +46,27 @@ public interface TaskMessage extends Message, Identifiable { String getId(); /** + * @return the sequence number of the message + * + * Terminology: a source sends a message to a target. + * Example sources are Tasks or Contexts. Example targets are Drivers. + * + * The method guarantees that sequence numbers increase strictly monotonically per message source. + * So numbers of messages from different sources should not be compared to each other. + * + * Clients of this method must not assume any particular method implementation + * because it may change in the future. + * + * The current implementation returns the timestamp of a heartbeat that contained the message. + * A heartbeat may contain several messages; all such message will get the same sequence number. + * A source can attach only one message to a single heartbeat, so the strict sequence number monotonicity + * per source is guaranteed. + * + */ + @Override + long getSequenceNumber(); + + /** * @return the id of the context the sending task is running in. */ String getContextId(); http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java new file mode 100644 index 0000000..de84217 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/Numberable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.io; + +/** + * Interface for objects which can form ordered sequences, e.g. {@link Message}. + */ +public interface Numberable { + + /** + * + * @return the number of an object in a ordered sequences of similar objects + * + * Guarantees of this method - such as partial or total order - are implementation-specific. + */ + long getSequenceNumber(); + +} http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java index 4c79972..89b574e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java @@ -28,14 +28,18 @@ import org.apache.reef.driver.context.ContextMessage; @Private @DriverSide public final class ContextMessageImpl implements ContextMessage { + private final byte[] theMessage; private final String theContextID; private final String theMessageSourceId; + private final long sequenceNumber; - public ContextMessageImpl(final byte[] theMessage, final String theContextID, final String theMessageSourceId) { + public ContextMessageImpl(final byte[] theMessage, final String theContextID, final String theMessageSourceId, + final long sequenceNumber) { this.theMessage = theMessage; this.theContextID = theContextID; this.theMessageSourceId = theMessageSourceId; + this.sequenceNumber = sequenceNumber; } @Override @@ -52,4 +56,9 @@ public final class ContextMessageImpl implements ContextMessage { public String getMessageSourceID() { return this.theMessageSourceId; } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index 5a117f9..6892a0c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -198,7 +198,8 @@ public final class ContextRepresenters { contextMessage : contextStatus.getContextMessageList()) { final byte[] theMessage = contextMessage.getMessage(); final String sourceID = contextMessage.getSourceId(); - this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID)); + final long sequenceNumber = contextMessage.getSequenceNumber(); + this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID, sequenceNumber)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index cc107c3..fcefb6b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -374,6 +374,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } } + // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806. + final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp(); + // Process the Evaluator status message if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()); @@ -384,16 +387,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { - contextStatusList.add(new ContextStatusPOJO(proto)); + contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber)); } this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts); // Process the Task status message - if (evaluatorHeartbeatProto.hasTaskStatus()) { - TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus()); + TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber); this.onTaskStatusMessage(taskStatus); } LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java index 3760719..c5aa0ea 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java @@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.Numberable; import org.apache.reef.proto.ReefServiceProtos; /** @@ -28,15 +29,17 @@ import org.apache.reef.proto.ReefServiceProtos; */ @DriverSide @Private -public final class ContextMessagePOJO { +public final class ContextMessagePOJO implements Numberable { private final byte[] message; private final String sourceId; + private final long sequenceNumber; - ContextMessagePOJO(final ReefServiceProtos.ContextStatusProto.ContextMessageProto proto){ + ContextMessagePOJO(final ReefServiceProtos.ContextStatusProto.ContextMessageProto proto, final long sequenceNumber){ message = proto.getMessage().toByteArray(); sourceId = proto.getSourceId(); + this.sequenceNumber = sequenceNumber; } /** @@ -52,4 +55,14 @@ public final class ContextMessagePOJO { public String getSourceId(){ return sourceId; } + + /** + * @return the sequence number of a message + * + * {@see ContextMessage#getSequenceNumber()} + */ + @Override + public long getSequenceNumber(){ + return this.sequenceNumber; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java index a636ea5..99bf4a1 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java @@ -41,7 +41,7 @@ public final class ContextStatusPOJO { private final List<ContextMessagePOJO> contextMessages = new ArrayList<>(); - public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto){ + public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto, final long sequenceNumber){ contextId = proto.getContextId(); parentId = proto.hasParentId() ? proto.getParentId() : null; @@ -49,7 +49,7 @@ public final class ContextStatusPOJO { contextState = proto.hasContextState()? getContextStateFromProto(proto.getContextState()) : null; for (final ContextMessageProto contextMessageProto : proto.getContextMessageList()) { - contextMessages.add(new ContextMessagePOJO(contextMessageProto)); + contextMessages.add(new ContextMessagePOJO(contextMessageProto, sequenceNumber)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java index 84bb4d6..be9aa71 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java @@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.Numberable; import org.apache.reef.proto.ReefServiceProtos; /** @@ -28,14 +29,16 @@ import org.apache.reef.proto.ReefServiceProtos; */ @DriverSide @Private -public final class TaskMessagePOJO { +public final class TaskMessagePOJO implements Numberable { private final byte[] message; private final String sourceId; + private final long sequenceNumber; - TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto proto){ + TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto proto, final long sequenceNumber) { message = proto.getMessage().toByteArray(); sourceId = proto.getSourceId(); + this.sequenceNumber = sequenceNumber; } /** @@ -51,4 +54,15 @@ public final class TaskMessagePOJO { public String getSourceId(){ return sourceId; } + + /** + * @return the sequence number of a message + * + * {@see TaskMessage#getSequenceNumber()} + */ + @Override + public long getSequenceNumber(){ + return this.sequenceNumber; + } + } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java index 66ecdda..f2e3f2d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java @@ -40,7 +40,7 @@ public final class TaskStatusPOJO { private final byte[] result; private final List<TaskMessagePOJO> taskMessages = new ArrayList<>(); - public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto){ + public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final long sequenceNumber){ taskId = proto.getTaskId(); contextId = proto.getContextId(); @@ -48,7 +48,7 @@ public final class TaskStatusPOJO { result = proto.hasResult() ? proto.getResult().toByteArray() : null; for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) { - taskMessages.add(new TaskMessagePOJO(taskMessageProto)); + taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java index 57ca425..7cfa2d9 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java @@ -33,13 +33,16 @@ public final class TaskMessageImpl implements TaskMessage { private final String taskId; private final String contextId; private final String theMessageSourceId; + private final long sequenceNumber; public TaskMessageImpl(final byte[] theMessage, final String taskId, - final String contextId, final String theMessageSourceId) { + final String contextId, final String theMessageSourceId, + final long sequenceNumber) { this.theMessage = theMessage; this.taskId = taskId; this.contextId = contextId; this.theMessageSourceId = theMessageSourceId; + this.sequenceNumber = sequenceNumber; } @Override @@ -61,4 +64,10 @@ public final class TaskMessageImpl implements TaskMessage { public String getMessageSourceID() { return this.theMessageSourceId; } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } + } http://git-wip-us.apache.org/repos/asf/reef/blob/a541312f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index 89d60a3..fc3d6e5 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -149,10 +149,10 @@ public final class TaskRepresenter { } for (final TaskMessagePOJO - taskMessageProto : taskStatus.getTaskMessageList()) { + taskMessagePOJO : taskStatus.getTaskMessageList()) { this.messageDispatcher.onTaskMessage( - new TaskMessageImpl(taskMessageProto.getMessage(), - this.taskId, this.context.getId(), taskMessageProto.getSourceId())); + new TaskMessageImpl(taskMessagePOJO.getMessage(), + this.taskId, this.context.getId(), taskMessagePOJO.getSourceId(), taskMessagePOJO.getSequenceNumber())); } }
