Repository: reef
Updated Branches:
  refs/heads/master a3ddf2171 -> fe9771096


[REEF-976] Fix broken C# Tests caused by race condition of local RM

This addressed the issue by
  * Having Driver ACK a DONE Evaluator.

JIRA:
  [REEF-976](https://issues.apache.org/jira/browse/REEF-976)

Pull Request:
  This closes #915


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fe977109
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fe977109
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fe977109

Branch: refs/heads/master
Commit: fe97710960a7826a474c45147018bdf33427a1a8
Parents: a3ddf21
Author: Andrew Chung <[email protected]>
Authored: Thu Mar 31 11:35:22 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Apr 4 13:07:23 2016 -0700

----------------------------------------------------------------------
 .../ReefProtocol/EvaluatorRunTime.pb.cs         | 25 ++++++++++++++++++++
 .../Runtime/Evaluator/EvaluatorRuntime.cs       | 15 ++++++++++--
 .../Runtime/Evaluator/HeartBeatManager.cs       |  2 +-
 .../driver/evaluator/EvaluatorManager.java      | 11 ++++++++-
 .../src/main/proto/evaluator_runtime.proto      |  6 +++++
 5 files changed, 55 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs 
b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs
index 931d221..aaf52e6 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs
@@ -11,6 +11,15 @@
 // Note: requires additional types generated from: reef_service_protos.proto
 namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol
 {
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name = 
@"DoneEvaluatorProto")]
+  public partial class DoneEvaluatorProto : global::ProtoBuf.IExtensible
+  {
+      public DoneEvaluatorProto() { }
+  
+      private global::ProtoBuf.IExtension extensionObject;
+      global::ProtoBuf.IExtension 
global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref 
extensionObject, createIfMissing); }
+  }
   [global::System.Serializable, 
global::ProtoBuf.ProtoContract(Name=@"StopEvaluatorProto")]
   public partial class StopEvaluatorProto : global::ProtoBuf.IExtensible
   {
@@ -296,6 +305,22 @@ namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol
       get { return _kill_evaluator; }
       set { _kill_evaluator = value; }
     }
+    private StopEvaluatorProto _stop_evaluator = null;
+    [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = 
@"stop_evaluator", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public StopEvaluatorProto stop_evaluator
+    {
+        get { return _stop_evaluator; }
+        set { _stop_evaluator = value; }
+    }
+    private DoneEvaluatorProto _done_evaluator = null;
+    [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = 
@"done_evaluator", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public DoneEvaluatorProto done_evaluator
+    {
+        get { return _done_evaluator; }
+        set { _done_evaluator = value; }
+    }
     private global::ProtoBuf.IExtension extensionObject;
     global::ProtoBuf.IExtension 
global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
       { return global::ProtoBuf.Extensible.GetExtensionObject(ref 
extensionObject, createIfMissing); }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index be26699..cc78815 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -83,6 +83,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                     Handle(new InvalidOperationException(
                         string.Format(CultureInfo.InvariantCulture, 
"Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", 
message.identifier, _evaluatorId)));
                 }
+                else if (_state == State.DONE)
+                {
+                    if (message.stop_evaluator != null)
+                    {
+                        Logger.Log(Level.Info, "Received ACK from Driver, 
shutting down Evaluator.");
+                        _clock.Dispose();
+
+                        return;
+                    }
+
+                    Handle(new InvalidOperationException("Received a control 
message from Driver after Evaluator is done."));
+                }
                 else if (_state != State.RUNNING)
                 {
                     Handle(new InvalidOperationException(
@@ -101,7 +113,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                                 Logger.Log(Level.Info, "Context stack is 
empty, done");
                                 _state = State.DONE;
                                 _heartBeatManager.OnNext(GetEvaluatorStatus());
-                                _clock.Dispose();
                             }
                         }
                         catch (Exception e)
@@ -123,7 +134,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
 
         public EvaluatorStatusProto GetEvaluatorStatus()
         {
-            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Evaluator state : {0}", _state));
+            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Evaluator state: {0}", _state));
             EvaluatorStatusProto evaluatorStatusProto = new 
EvaluatorStatusProto
             {
                 evaluator_id = _evaluatorId,

http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index 3ff7167..becdc7a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -138,7 +138,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 {
                     if (evaluatorHeartbeatProto.task_status == null || 
evaluatorHeartbeatProto.task_status.state != State.RUNNING)
                     {
-                        
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications 
to driver when no task is running, recovery NOT supported for such scenario", 
LOGGER);
+                        Utilities.Diagnostics.Exceptions.Throw(e, "Lost 
communications to driver when no task is running, recovery NOT supported for 
such scenario", LOGGER);
                     }
 
                     _heartbeatFailures++;

http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 9003ca4..e364ff1 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -343,7 +343,7 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
 
     synchronized (this.evaluatorDescriptor) {
       if (this.stateManager.isDoneOrFailedOrKilled()) {
-        LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} 
which is already in state {1}.",
+        LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} 
which is already in state {1}.",
             new Object[]{this.getId(), this.stateManager});
         return;
       }
@@ -438,6 +438,15 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) 
{
     assert message.getState() == State.DONE;
     LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
+
+    // Send an ACK to the Evaluator.
+    sendEvaluatorControlMessage(
+        EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
+            .setTimestamp(System.currentTimeMillis())
+            .setIdentifier(getId())
+            
.setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build())
+            .build());
+
     this.stateManager.setDone();
     this.messageDispatcher.onEvaluatorCompleted(new 
CompletedEvaluatorImpl(this.evaluatorId));
     close();

http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/java/reef-common/src/main/proto/evaluator_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/evaluator_runtime.proto 
b/lang/java/reef-common/src/main/proto/evaluator_runtime.proto
index 7d82696..5dee352 100644
--- a/lang/java/reef-common/src/main/proto/evaluator_runtime.proto
+++ b/lang/java/reef-common/src/main/proto/evaluator_runtime.proto
@@ -24,6 +24,10 @@ option java_generate_equals_and_hash = true;
 
 import "reef_service_protos.proto";
 
+// ACK the done message from the evaluator
+message DoneEvaluatorProto {
+}
+
 // Stop the evaluator
 message StopEvaluatorProto {
 }
@@ -89,4 +93,6 @@ message EvaluatorControlProto {
 
     optional ContextControlProto context_control = 3;
     optional KillEvaluatorProto kill_evaluator = 4;
+    optional StopEvaluatorProto stop_evaluator = 5;
+    optional DoneEvaluatorProto done_evaluator = 6;
 }

Reply via email to