Repository: reef
Updated Branches:
  refs/heads/master 2b032719f -> a246ac68b


[REEF-1385] Implement TaskMessage.MessageSourceID in REEF.NET

This addressed the issue by
  * Implementing TaskMessage.MessageSourceID.
  * Modifying messaging test to check for MessageSourceID.

JIRA:
  [REEF-1385](https://issues.apache.org/jira/browse/REEF-1385)

Pull Request:
  This closes #1010


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a246ac68
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a246ac68
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a246ac68

Branch: refs/heads/master
Commit: a246ac68b103e8c6f41c570aeb4af72c1a1a16ba
Parents: 2b03271
Author: Andrew Chung <[email protected]>
Authored: Wed May 18 17:17:03 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu May 19 16:26:59 2016 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h        |  2 ++
 .../Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp   | 15 +++++++++++++++
 .../Bridge/Clr2java/ITaskMessageClr2Java.cs          |  2 ++
 .../Bridge/Events/TaskMessage.cs                     |  6 +++++-
 lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs  |  5 +++++
 .../Functional/Messaging/MessageDriver.cs            | 13 +++++++++----
 .../Functional/Messaging/MessageTask.cs              |  1 -
 .../apache/reef/javabridge/TaskMessageBridge.java    |  8 +++++---
 8 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index e3fef8a..cf4b947 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -111,12 +111,14 @@ namespace Org {
                             jobject  _jobjectTaskMessage = NULL;
                             JavaVM* _jvm;
                             jstring _jstringId = NULL;
+                            jstring _jstringMessageSourceId = NULL;
                         public:
                             TaskMessageClr2Java(JNIEnv *env, jobject 
jtaskMessage);
                             ~TaskMessageClr2Java();
                             !TaskMessageClr2Java();
                             virtual void OnError(String^ message);
                             virtual String^ GetId();
+                            virtual String^ GetMessageSourceId();
                         };
 
                         public ref class FailedTaskClr2Java : public 
IFailedTaskClr2Java {

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
index 22db358..b5cee9d 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
@@ -36,8 +36,14 @@ namespace Org {
                                                  }
                                                  _jobjectTaskMessage = 
reinterpret_cast<jobject>(env->NewGlobalRef(jtaskMessage));
 
+                                                 // Get the Task ID.
                                                  jclass jclassTaskMessage = 
env->GetObjectClass(_jobjectTaskMessage);
                                                  _jstringId = 
CommonUtilities::GetJObjectId(env, _jobjectTaskMessage, jclassTaskMessage);
+                                                 
+                                                 // Get the Task Message 
Source ID.
+                                                 jmethodID jmid = 
env->GetMethodID(jclassTaskMessage, "getMessageSourceId", 
"()Ljava/lang/String;");
+                                                 _jstringMessageSourceId = 
CommonUtilities::CallGetMethodNewGlobalRef<jstring>(env, _jobjectTaskMessage, 
jmid);
+
                                                  
ManagedLog::LOGGER->LogStop("TaskMessageClr2Java::TaskMessageClr2Java");
                                          }
 
@@ -54,6 +60,10 @@ namespace Org {
                                                  if (_jstringId != NULL) {
                                                          
env->DeleteGlobalRef(_jstringId);
                                                  }
+
+                                                 if (_jstringMessageSourceId 
!= NULL) {
+                                                         
env->DeleteGlobalRef(_jstringMessageSourceId);
+                                                 }
                                          }
 
                                          void 
TaskMessageClr2Java::OnError(String^ message) {
@@ -67,6 +77,11 @@ namespace Org {
                                                  JNIEnv *env = 
RetrieveEnv(_jvm);
                                                  return 
ManagedStringFromJavaString(env, _jstringId);
                                          }
+
+                                         String^ 
TaskMessageClr2Java::GetMessageSourceId() {
+                                                 JNIEnv *env = 
RetrieveEnv(_jvm);
+                                                 return 
ManagedStringFromJavaString(env, _jstringMessageSourceId);
+                                         }
                                  }
                          }
                  }

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
index fc6a362..0b2be83 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
@@ -23,5 +23,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
     public interface ITaskMessageClr2Java : IClr2Java
     {
         string GetId();
+
+        string GetMessageSourceId();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
index 718d898..ea4c1e9 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
@@ -23,7 +23,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
 {
     /// <summary>
     /// TaskMessage which wraps ITaskMessageClr2Java
-    /// TODO[JIRA REEF-1385]: Implement MessageSourceID.
     /// </summary>
     [DataContract]
     internal sealed class TaskMessage : ITaskMessage
@@ -43,6 +42,11 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
             get { return _taskMessageClr2Java.GetId(); }
         }
 
+        public string MessageSourceId
+        {
+            get { return _taskMessageClr2Java.GetMessageSourceId(); }
+        }
+
         [DataMember]
         public byte[] Message
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs 
b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
index b411a45..1cdbd6a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
@@ -31,5 +31,10 @@ namespace Org.Apache.REEF.Driver.Task
         /// The ID of the task that sent the message.
         /// </summary>
         string TaskId { get; }
+
+        /// <summary>
+        /// The message source ID of the TaskMessage.
+        /// </summary>
+        string MessageSourceId { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
index 3183b0a..2f6aeaa 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
@@ -78,14 +78,19 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
 
         public void OnNext(ITaskMessage taskMessage)
         {
-            // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID.
             var msgReceived = 
ByteUtilities.ByteArraysToString(taskMessage.Message);
 
             if (!msgReceived.Equals(MessageTask.MessageSend))
             {
-                Exceptions.Throw(new Exception("Unexpected message: " + 
msgReceived),
-                    "Unexpected task message received: " + msgReceived,
-                    Logger);
+                var errorMessage = "Unexpected task message received: " + 
msgReceived + ". Expected: " +
+                                   MessageTask.MessageSend;
+                Exceptions.Throw(new Exception(errorMessage), errorMessage, 
Logger);
+            }
+            else if 
(!taskMessage.MessageSourceId.Equals(MessageTask.MessageTaskSourceId))
+            {
+                var errorMessage = "Unexpected TaskMessage.MessageSourceId 
received: " + taskMessage.MessageSourceId +
+                                   ". Expected: " + 
MessageTask.MessageTaskSourceId;
+                Exceptions.Throw(new Exception(errorMessage), errorMessage, 
Logger);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
index 60662a9..cfe2483 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
@@ -36,7 +36,6 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
 
         public const string MessageSend = "MESSAGE:TASK";
 
-        // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID on the Driver 
side.
         public const string MessageTaskSourceId = "MessageTaskSourceId";
 
         public const string MessageSentToDriverLog = "Message sent to Driver 
from Task.";

http://git-wip-us.apache.org/repos/asf/reef/blob/a246ac68/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
index dd688b4..ec1156e 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
@@ -32,12 +32,10 @@ import org.apache.reef.io.naming.Identifiable;
     CsFiles = { "ITaskMessageClr2Java.cs", "TaskMessage.cs" })
 public final class TaskMessageBridge extends NativeBridge implements 
Identifiable {
   private TaskMessage jtaskMessage;
-  private String taskId;
 
   // we don't really need to pass this around, just have this as place holder 
for future.
   public TaskMessageBridge(final TaskMessage taskMessage) {
     jtaskMessage = taskMessage;
-    taskId = taskMessage.getId();
   }
 
   @Override
@@ -46,6 +44,10 @@ public final class TaskMessageBridge extends NativeBridge 
implements Identifiabl
 
   @Override
   public String getId() {
-    return taskId;
+    return jtaskMessage.getId();
+  }
+
+  public String getMessageSourceId() {
+    return jtaskMessage.getMessageSourceID();
   }
 }

Reply via email to