Repository: reef Updated Branches: refs/heads/master 2b032719f -> a246ac68b
[REEF-1385] Implement TaskMessage.MessageSourceID in REEF.NET This addressed the issue by * Implementing TaskMessage.MessageSourceID. * Modifying messaging test to check for MessageSourceID. JIRA: [REEF-1385](https://issues.apache.org/jira/browse/REEF-1385) Pull Request: This closes #1010 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a246ac68 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a246ac68 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a246ac68 Branch: refs/heads/master Commit: a246ac68b103e8c6f41c570aeb4af72c1a1a16ba Parents: 2b03271 Author: Andrew Chung <[email protected]> Authored: Wed May 18 17:17:03 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu May 19 16:26:59 2016 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 2 ++ .../Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp | 15 +++++++++++++++ .../Bridge/Clr2java/ITaskMessageClr2Java.cs | 2 ++ .../Bridge/Events/TaskMessage.cs | 6 +++++- lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs | 5 +++++ .../Functional/Messaging/MessageDriver.cs | 13 +++++++++---- .../Functional/Messaging/MessageTask.cs | 1 - .../apache/reef/javabridge/TaskMessageBridge.java | 8 +++++--- 8 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index e3fef8a..cf4b947 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -111,12 +111,14 @@ namespace Org { jobject _jobjectTaskMessage = NULL; JavaVM* _jvm; jstring _jstringId = NULL; + jstring _jstringMessageSourceId = NULL; public: TaskMessageClr2Java(JNIEnv *env, jobject jtaskMessage); ~TaskMessageClr2Java(); !TaskMessageClr2Java(); virtual void OnError(String^ message); virtual String^ GetId(); + virtual String^ GetMessageSourceId(); }; public ref class FailedTaskClr2Java : public IFailedTaskClr2Java { http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp index 22db358..b5cee9d 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp @@ -36,8 +36,14 @@ namespace Org { } _jobjectTaskMessage = reinterpret_cast<jobject>(env->NewGlobalRef(jtaskMessage)); + // Get the Task ID. jclass jclassTaskMessage = env->GetObjectClass(_jobjectTaskMessage); _jstringId = CommonUtilities::GetJObjectId(env, _jobjectTaskMessage, jclassTaskMessage); + + // Get the Task Message Source ID. + jmethodID jmid = env->GetMethodID(jclassTaskMessage, "getMessageSourceId", "()Ljava/lang/String;"); + _jstringMessageSourceId = CommonUtilities::CallGetMethodNewGlobalRef<jstring>(env, _jobjectTaskMessage, jmid); + ManagedLog::LOGGER->LogStop("TaskMessageClr2Java::TaskMessageClr2Java"); } @@ -54,6 +60,10 @@ namespace Org { if (_jstringId != NULL) { env->DeleteGlobalRef(_jstringId); } + + if (_jstringMessageSourceId != NULL) { + env->DeleteGlobalRef(_jstringMessageSourceId); + } } void TaskMessageClr2Java::OnError(String^ message) { @@ -67,6 +77,11 @@ namespace Org { JNIEnv *env = RetrieveEnv(_jvm); return ManagedStringFromJavaString(env, _jstringId); } + + String^ TaskMessageClr2Java::GetMessageSourceId() { + JNIEnv *env = RetrieveEnv(_jvm); + return ManagedStringFromJavaString(env, _jstringMessageSourceId); + } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs index fc6a362..0b2be83 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs @@ -23,5 +23,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java public interface ITaskMessageClr2Java : IClr2Java { string GetId(); + + string GetMessageSourceId(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs index 718d898..ea4c1e9 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs @@ -23,7 +23,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Events { /// <summary> /// TaskMessage which wraps ITaskMessageClr2Java - /// TODO[JIRA REEF-1385]: Implement MessageSourceID. /// </summary> [DataContract] internal sealed class TaskMessage : ITaskMessage @@ -43,6 +42,11 @@ namespace Org.Apache.REEF.Driver.Bridge.Events get { return _taskMessageClr2Java.GetId(); } } + public string MessageSourceId + { + get { return _taskMessageClr2Java.GetMessageSourceId(); } + } + [DataMember] public byte[] Message { http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs index b411a45..1cdbd6a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs @@ -31,5 +31,10 @@ namespace Org.Apache.REEF.Driver.Task /// The ID of the task that sent the message. /// </summary> string TaskId { get; } + + /// <summary> + /// The message source ID of the TaskMessage. + /// </summary> + string MessageSourceId { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs index 3183b0a..2f6aeaa 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs @@ -78,14 +78,19 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging public void OnNext(ITaskMessage taskMessage) { - // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID. var msgReceived = ByteUtilities.ByteArraysToString(taskMessage.Message); if (!msgReceived.Equals(MessageTask.MessageSend)) { - Exceptions.Throw(new Exception("Unexpected message: " + msgReceived), - "Unexpected task message received: " + msgReceived, - Logger); + var errorMessage = "Unexpected task message received: " + msgReceived + ". Expected: " + + MessageTask.MessageSend; + Exceptions.Throw(new Exception(errorMessage), errorMessage, Logger); + } + else if (!taskMessage.MessageSourceId.Equals(MessageTask.MessageTaskSourceId)) + { + var errorMessage = "Unexpected TaskMessage.MessageSourceId received: " + taskMessage.MessageSourceId + + ". Expected: " + MessageTask.MessageTaskSourceId; + Exceptions.Throw(new Exception(errorMessage), errorMessage, Logger); } else { http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs index 60662a9..cfe2483 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs @@ -36,7 +36,6 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging public const string MessageSend = "MESSAGE:TASK"; - // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID on the Driver side. public const string MessageTaskSourceId = "MessageTaskSourceId"; public const string MessageSentToDriverLog = "Message sent to Driver from Task."; http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java index dd688b4..ec1156e 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java @@ -32,12 +32,10 @@ import org.apache.reef.io.naming.Identifiable; CsFiles = { "ITaskMessageClr2Java.cs", "TaskMessage.cs" }) public final class TaskMessageBridge extends NativeBridge implements Identifiable { private TaskMessage jtaskMessage; - private String taskId; // we don't really need to pass this around, just have this as place holder for future. public TaskMessageBridge(final TaskMessage taskMessage) { jtaskMessage = taskMessage; - taskId = taskMessage.getId(); } @Override @@ -46,6 +44,10 @@ public final class TaskMessageBridge extends NativeBridge implements Identifiabl @Override public String getId() { - return taskId; + return jtaskMessage.getId(); + } + + public String getMessageSourceId() { + return jtaskMessage.getMessageSourceID(); } }
