Repository: reef Updated Branches: refs/heads/master a3ddf2171 -> fe9771096
[REEF-976] Fix broken C# Tests caused by race condition of local RM This addressed the issue by * Having Driver ACK a DONE Evaluator. JIRA: [REEF-976](https://issues.apache.org/jira/browse/REEF-976) Pull Request: This closes #915 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fe977109 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fe977109 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fe977109 Branch: refs/heads/master Commit: fe97710960a7826a474c45147018bdf33427a1a8 Parents: a3ddf21 Author: Andrew Chung <[email protected]> Authored: Thu Mar 31 11:35:22 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Apr 4 13:07:23 2016 -0700 ---------------------------------------------------------------------- .../ReefProtocol/EvaluatorRunTime.pb.cs | 25 ++++++++++++++++++++ .../Runtime/Evaluator/EvaluatorRuntime.cs | 15 ++++++++++-- .../Runtime/Evaluator/HeartBeatManager.cs | 2 +- .../driver/evaluator/EvaluatorManager.java | 11 ++++++++- .../src/main/proto/evaluator_runtime.proto | 6 +++++ 5 files changed, 55 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs index 931d221..aaf52e6 100644 --- a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs +++ b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/EvaluatorRunTime.pb.cs @@ -11,6 +11,15 @@ // Note: requires additional types generated from: reef_service_protos.proto namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol { + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name = @"DoneEvaluatorProto")] + public partial class DoneEvaluatorProto : global::ProtoBuf.IExtensible + { + public DoneEvaluatorProto() { } + + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"StopEvaluatorProto")] public partial class StopEvaluatorProto : global::ProtoBuf.IExtensible { @@ -296,6 +305,22 @@ namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol get { return _kill_evaluator; } set { _kill_evaluator = value; } } + private StopEvaluatorProto _stop_evaluator = null; + [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"stop_evaluator", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public StopEvaluatorProto stop_evaluator + { + get { return _stop_evaluator; } + set { _stop_evaluator = value; } + } + private DoneEvaluatorProto _done_evaluator = null; + [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = @"done_evaluator", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public DoneEvaluatorProto done_evaluator + { + get { return _done_evaluator; } + set { _done_evaluator = value; } + } private global::ProtoBuf.IExtension extensionObject; global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index be26699..cc78815 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -83,6 +83,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator Handle(new InvalidOperationException( string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); } + else if (_state == State.DONE) + { + if (message.stop_evaluator != null) + { + Logger.Log(Level.Info, "Received ACK from Driver, shutting down Evaluator."); + _clock.Dispose(); + + return; + } + + Handle(new InvalidOperationException("Received a control message from Driver after Evaluator is done.")); + } else if (_state != State.RUNNING) { Handle(new InvalidOperationException( @@ -101,7 +113,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator Logger.Log(Level.Info, "Context stack is empty, done"); _state = State.DONE; _heartBeatManager.OnNext(GetEvaluatorStatus()); - _clock.Dispose(); } } catch (Exception e) @@ -123,7 +134,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public EvaluatorStatusProto GetEvaluatorStatus() { - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state: {0}", _state)); EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto { evaluator_id = _evaluatorId, http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index 3ff7167..becdc7a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -138,7 +138,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); } _heartbeatFailures++; http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 9003ca4..e364ff1 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -343,7 +343,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { synchronized (this.evaluatorDescriptor) { if (this.stateManager.isDoneOrFailedOrKilled()) { - LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", + LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.", new Object[]{this.getId(), this.stateManager}); return; } @@ -438,6 +438,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { assert message.getState() == State.DONE; LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); + + // Send an ACK to the Evaluator. + sendEvaluatorControlMessage( + EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setIdentifier(getId()) + .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build()) + .build()); + this.stateManager.setDone(); this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); close(); http://git-wip-us.apache.org/repos/asf/reef/blob/fe977109/lang/java/reef-common/src/main/proto/evaluator_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/proto/evaluator_runtime.proto b/lang/java/reef-common/src/main/proto/evaluator_runtime.proto index 7d82696..5dee352 100644 --- a/lang/java/reef-common/src/main/proto/evaluator_runtime.proto +++ b/lang/java/reef-common/src/main/proto/evaluator_runtime.proto @@ -24,6 +24,10 @@ option java_generate_equals_and_hash = true; import "reef_service_protos.proto"; +// ACK the done message from the evaluator +message DoneEvaluatorProto { +} + // Stop the evaluator message StopEvaluatorProto { } @@ -89,4 +93,6 @@ message EvaluatorControlProto { optional ContextControlProto context_control = 3; optional KillEvaluatorProto kill_evaluator = 4; + optional StopEvaluatorProto stop_evaluator = 5; + optional DoneEvaluatorProto done_evaluator = 6; }
