http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs new file mode 100644 index 0000000..383467e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs @@ -0,0 +1,393 @@ +//------------------------------------------------------------------------------ +// <auto-generated> +// This code was generated by a tool. +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// </auto-generated> +//------------------------------------------------------------------------------ + +// Generated from: reef_service_protos.proto +namespace Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto + +{ + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"FileResourceProto")] + public partial class FileResourceProto : global::ProtoBuf.IExtensible + { + public FileResourceProto() {} + + private FileType _type; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"type", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public FileType type + { + get { return _type; } + set { _type = value; } + } + private string _name; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string name + { + get { return _name; } + set { _name = value; } + } + private string _path; + [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"path", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string path + { + get { return _path; } + set { _path = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"RuntimeErrorProto")] + public partial class RuntimeErrorProto : global::ProtoBuf.IExtensible + { + public RuntimeErrorProto() {} + + private string _name; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string name + { + get { return _name; } + set { _name = value; } + } + private string _message; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string message + { + get { return _message; } + set { _message = value; } + } + private byte[] _exception = null; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] exception + { + get { return _exception; } + set { _exception = value; } + } + private string _identifier = ""; + [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string identifier + { + get { return _identifier; } + set { _identifier = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"JobStatusProto")] + public partial class JobStatusProto : global::ProtoBuf.IExtensible + { + public JobStatusProto() {} + + private string _identifier; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string identifier + { + get { return _identifier; } + set { _identifier = value; } + } + private State _state; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public State state + { + get { return _state; } + set { _state = value; } + } + private byte[] _message = null; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] message + { + get { return _message; } + set { _message = value; } + } + private byte[] _exception = null; + [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] exception + { + get { return _exception; } + set { _exception = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextStatusProto")] + public partial class ContextStatusProto : global::ProtoBuf.IExtensible + { + public ContextStatusProto() {} + + private ContextStatusProto.State _context_state; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public ContextStatusProto.State context_state + { + get { return _context_state; } + set { _context_state = value; } + } + private string _context_id; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string context_id + { + get { return _context_id; } + set { _context_id = value; } + } + private string _parent_id = ""; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"parent_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string parent_id + { + get { return _parent_id; } + set { _parent_id = value; } + } + private byte[] _error = null; + [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] error + { + get { return _error; } + set { _error = value; } + } + private bool _recovery = false; + [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(false)] + public bool recovery + { + get { return _recovery; } + set { _recovery = value; } + } + private readonly global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> _context_message = new global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto>(); + [global::ProtoBuf.ProtoMember(7, Name=@"context_message", DataFormat = global::ProtoBuf.DataFormat.Default)] + public global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> context_message + { + get { return _context_message; } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextMessageProto")] + public partial class ContextMessageProto : global::ProtoBuf.IExtensible + { + public ContextMessageProto() {} + + private string _source_id; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string source_id + { + get { return _source_id; } + set { _source_id = value; } + } + private byte[] _message; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] message + { + get { return _message; } + set { _message = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::ProtoBuf.ProtoContract(Name=@"State")] + public enum State + { + + [global::ProtoBuf.ProtoEnum(Name=@"READY", Value=0)] + READY = 0, + + [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=1)] + DONE = 1, + + [global::ProtoBuf.ProtoEnum(Name=@"FAIL", Value=2)] + FAIL = 2 + } + + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskStatusProto")] + public partial class TaskStatusProto : global::ProtoBuf.IExtensible + { + public TaskStatusProto() {} + + private string _task_id; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"task_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string task_id + { + get { return _task_id; } + set { _task_id = value; } + } + private string _context_id; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string context_id + { + get { return _context_id; } + set { _context_id = value; } + } + private State _state; + [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public State state + { + get { return _state; } + set { _state = value; } + } + private byte[] _result = null; + [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"result", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] result + { + get { return _result; } + set { _result = value; } + } + private bool _recovery = false; + [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(false)] + public bool recovery + { + get { return _recovery; } + set { _recovery = value; } + } + private readonly global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> _task_message = new global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto>(); + [global::ProtoBuf.ProtoMember(6, Name=@"task_message", DataFormat = global::ProtoBuf.DataFormat.Default)] + public global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> task_message + { + get { return _task_message; } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskMessageProto")] + public partial class TaskMessageProto : global::ProtoBuf.IExtensible + { + public TaskMessageProto() {} + + private string _source_id; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string source_id + { + get { return _source_id; } + set { _source_id = value; } + } + private byte[] _message; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] message + { + get { return _message; } + set { _message = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorStatusProto")] + public partial class EvaluatorStatusProto : global::ProtoBuf.IExtensible + { + public EvaluatorStatusProto() {} + + private string _evaluator_id; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"evaluator_id", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string evaluator_id + { + get { return _evaluator_id; } + set { _evaluator_id = value; } + } + private State _state; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public State state + { + get { return _state; } + set { _state = value; } + } + private byte[] _error = null; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue(null)] + public byte[] error + { + get { return _error; } + set { _error = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::ProtoBuf.ProtoContract(Name=@"State")] + public enum State + { + + [global::ProtoBuf.ProtoEnum(Name=@"INIT", Value=0)] + INIT = 0, + + [global::ProtoBuf.ProtoEnum(Name=@"RUNNING", Value=1)] + RUNNING = 1, + + [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=2)] + DONE = 2, + + [global::ProtoBuf.ProtoEnum(Name=@"SUSPEND", Value=3)] + SUSPEND = 3, + + [global::ProtoBuf.ProtoEnum(Name=@"FAILED", Value=4)] + FAILED = 4, + + [global::ProtoBuf.ProtoEnum(Name=@"KILLED", Value=5)] + KILLED = 5 + } + + [global::ProtoBuf.ProtoContract(Name=@"FileType")] + public enum FileType + { + + [global::ProtoBuf.ProtoEnum(Name=@"PLAIN", Value=0)] + PLAIN = 0, + + [global::ProtoBuf.ProtoEnum(Name=@"LIB", Value=1)] + LIB = 1, + + [global::ProtoBuf.ProtoEnum(Name=@"ARCHIVE", Value=2)] + ARCHIVE = 2 + } + + [global::ProtoBuf.ProtoContract(Name=@"SIZE")] + public enum SIZE + { + + [global::ProtoBuf.ProtoEnum(Name=@"SMALL", Value=0)] + SMALL = 0, + + [global::ProtoBuf.ProtoEnum(Name=@"MEDIUM", Value=1)] + MEDIUM = 1, + + [global::ProtoBuf.ProtoEnum(Name=@"LARGE", Value=2)] + LARGE = 2, + + [global::ProtoBuf.ProtoEnum(Name=@"XLARGE", Value=3)] + XLARGE = 3 + } + + [global::ProtoBuf.ProtoContract(Name=@"ProcessType")] + public enum ProcessType + { + + [global::ProtoBuf.ProtoEnum(Name=@"JVM", Value=0)] + JVM = 0, + + [global::ProtoBuf.ProtoEnum(Name=@"CLR", Value=1)] + CLR = 1 + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs new file mode 100644 index 0000000..6bd90e8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Utilities; + +using ProtoBuf; +using System; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.IO; +using System.Text; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1403:FileMayOnlyContainASingleNamespace", Justification = "Serializers for all protobuf types")] +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "Serializers for all protobuf types")] + +namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol +{ + /// <summary> + /// Add serializer/deserializer to REEFMessage + /// </summary> + public partial class REEFMessage + { + public REEFMessage(EvaluatorHeartbeatProto evaluatorHeartbeatProto) + { + _evaluatorHeartBeat = evaluatorHeartbeatProto; + } + + public static REEFMessage Deserialize(byte[] bytes) + { + REEFMessage pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<REEFMessage>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + byte[] b = null; + using (var s = new MemoryStream()) + { + Serializer.Serialize<REEFMessage>(s, this); + b = new byte[s.Position]; + var fullBuffer = s.GetBuffer(); + Array.Copy(fullBuffer, b, b.Length); + } + return b; + } + } +} + +namespace Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto +{ + /// <summary> + /// Add serializer/deserializer to EvaluatorHeartbeatProto + /// </summary> + public partial class EvaluatorHeartbeatProto + { + public static EvaluatorHeartbeatProto Deserialize(byte[] bytes) + { + EvaluatorHeartbeatProto pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<EvaluatorHeartbeatProto>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + byte[] b = null; + using (var s = new MemoryStream()) + { + Serializer.Serialize<EvaluatorHeartbeatProto>(s, this); + b = new byte[s.Position]; + var fullBuffer = s.GetBuffer(); + Array.Copy(fullBuffer, b, b.Length); + } + return b; + } + + public override string ToString() + { + string contextStatus = string.Empty; + string taskStatusMessage = string.Empty; + foreach (ContextStatusProto contextStatusProto in context_status) + { + contextStatus += string.Format(CultureInfo.InvariantCulture, "evaluator {0} has context {1} in state {2} with recovery flag {3}", + evaluator_status.evaluator_id, + contextStatusProto.context_id, + contextStatusProto.context_state, + contextStatusProto.recovery); + } + if (task_status != null && task_status.task_message != null && task_status.task_message.Count > 0) + { + foreach (TaskStatusProto.TaskMessageProto taskMessageProto in task_status.task_message) + { + taskStatusMessage += ByteUtilities.ByteArrarysToString(taskMessageProto.message); + } + } + return string.Format(CultureInfo.InvariantCulture, "EvaluatorHeartbeatProto: task_id=[{0}], task_status=[{1}], task_message=[{2}], evaluator_status=[{3}], context_status=[{4}], timestamp=[{5}], recoveryFlag =[{6}]", + task_status == null ? string.Empty : task_status.task_id, + task_status == null ? string.Empty : task_status.state.ToString(), + taskStatusMessage, + evaluator_status.state.ToString(), + contextStatus, + timestamp, + recovery); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs new file mode 100644 index 0000000..0f997a4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol +{ + public class EvaluatorHeartbeatProtoCodec : ICodec<EvaluatorHeartbeatProto> + { + public byte[] Encode(EvaluatorHeartbeatProto obj) + { + EvaluatorHeartbeatProto pbuf = new EvaluatorHeartbeatProto(); + + pbuf.evaluator_status = obj.evaluator_status; + return pbuf.Serialize(); + } + + public EvaluatorHeartbeatProto Decode(byte[] data) + { + EvaluatorHeartbeatProto pbuf = EvaluatorHeartbeatProto.Deserialize(data); + return pbuf; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs new file mode 100644 index 0000000..41109e3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol +{ + public class REEFMessageCodec : ICodec<REEFMessage> + { + public byte[] Encode(REEFMessage obj) + { + return obj.Serialize(); + } + + public REEFMessage Decode(byte[] data) + { + REEFMessage pbuf = REEFMessage.Deserialize(data); + return pbuf; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto new file mode 100644 index 0000000..3d1f927 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +option java_package = "org.apache.reef.proto"; +option java_outer_classname = "ClientRuntimeProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "reef_service_protos.proto"; + +// Messages from REEF Client -> Driver Runtime + +message JobSubmissionProto { + required string identifier = 1; // the job identifier + required string remote_id = 2; // the remote identifier + required string configuration = 5; // the runtime configuration + required string user_name = 6; // the user name + + optional SIZE driver_size = 7; + optional int32 driver_memory = 8; + optional int32 priority = 9; + optional string queue = 10; + + repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators + repeated FileResourceProto local_File = 12; // files that should be placed on the driver only + +} + +enum Signal { + SIG_TERMINATE = 1; + SIG_SUSPEND = 2; + SIG_RESUME = 3; +} + +message JobControlProto { + required string identifier = 1; + optional Signal signal = 2; + optional bytes message = 3; +} + + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto new file mode 100644 index 0000000..2b21ac7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +option java_package = "org.apache.reef.proto"; +option java_outer_classname = "DriverRuntimeProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + + +import "reef_service_protos.proto"; + +// Messages from Driver Runtime -> Driver Process + +message DriverProcessRegistrationProto { + required string remote_identifier = 1; +} + + +message NodeDescriptorProto { + required string identifier = 1; + required string host_name = 2; // e.g., IP address + required int32 port = 3; // e.g., IP port + required int32 memory_size = 4; + optional string rack_name = 5; // e.g., /default-rack +} + +message ResourceAllocationProto { + required string identifier = 1; // e.g., the container id, or the thread id + required int32 resource_memory = 2; // megabytes + required string node_id = 3; +} + +message ResourceStatusProto { + required string identifier = 1; + required State state = 2; + optional string diagnostics = 3; + optional int32 exit_code = 4; + optional bool is_from_previous_driver = 5; +} + +message RuntimeStatusProto { + required string name = 1; // e.g., local, yarn21 + required State state = 2; + optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error + + optional int32 outstanding_container_requests = 5; + repeated string container_allocation = 6; +} + +////////////////////////////////////////////////////// +// Messages from Driver Process -> Driver Runtime + +message ResourceRequestProto { + // optional SIZE resource_size = 1; // Removed in REEF 0.3 in favor of memory_size. + optional int32 memory_size = 2; // Memory size of the evaluator in MB + optional int32 priority = 3; + + required int32 resource_count = 5; + repeated string node_name = 6; // a list of specific nodes + repeated string rack_name = 7; // a list of specific racks + + optional bool relax_locality = 10; +} + +message ResourceReleaseProto { + required string identifier = 1; +} + +message ResourceLaunchProto { + required string identifier = 1; + required string remote_id = 2; + required string evaluator_conf = 3; + required ProcessType type = 4; + repeated FileResourceProto file = 10; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto new file mode 100644 index 0000000..1415e5c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +option java_package = "org.apache.reef.proto"; +option java_outer_classname = "EvaluatorRuntimeProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "reef_service_protos.proto"; + +// Stop the evaluator +message StopEvaluatorProto { +} + +// Kill the evaluator +message KillEvaluatorProto { +} + +// Start a task +message StartTaskProto { + required string context_id = 1; + required string configuration = 2; +} + +message AddContextProto { + required string parent_context_id = 1; + required string context_configuration = 2; + optional string service_configuration = 3; +} + +message RemoveContextProto { + required string context_id = 1; +} + +// Stop the task +message StopTaskProto { +} + +// Suspend the task +message SuspendTaskProto { +} + +///////////////////////////////////////// +// Message aggregators + +message ContextMessageProto { + required string context_id = 1; + required bytes message = 2; +} + +message ContextControlProto { + optional bytes task_message = 1; + optional ContextMessageProto context_message = 2; + + optional AddContextProto add_context = 5; + optional RemoveContextProto remove_context = 6; + optional StartTaskProto start_task = 7; + optional StopTaskProto stop_task = 8; + optional SuspendTaskProto suspend_task = 9; +} + +message EvaluatorHeartbeatProto { + required int64 timestamp = 1; + required EvaluatorStatusProto evaluator_status = 2; + repeated ContextStatusProto context_status = 3; + optional TaskStatusProto task_status = 4; + optional bool recovery = 5; +} + +message EvaluatorControlProto { + required int64 timestamp = 1; + required string identifier = 2; + + optional ContextControlProto context_control = 3; + optional KillEvaluatorProto kill_evaluator = 4; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto new file mode 100644 index 0000000..6b99415 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import "client_runtime.proto"; + +import "evaluator_runtime.proto"; + +import "reef_service_protos.proto"; + + +option java_package = "com.Org.Apache.REEF.proto"; + +option java_generic_services = true; + +option java_generate_equals_and_hash = true; + +option java_outer_classname = "REEFProtocol"; + +message REEFMessage { + // Messages defined in client_runtime.proto + optional JobSubmissionProto jobSubmission = 1; + optional JobControlProto jobControl = 2; + // Messages defined in reef_service_protos.proto + optional RuntimeErrorProto runtimeError = 3; + optional JobStatusProto jobStatus = 4; + // Messages from evaluator_runtime.proto + optional EvaluatorControlProto evaluatorControl = 5; + optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto new file mode 100644 index 0000000..a553ca9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +option java_package = "org.apache.reef.reef.proto"; + +option java_outer_classname = "ReefServiceProtos"; + +option java_generic_services = true; + +option java_generate_equals_and_hash = true; + +enum State { + INIT = 0; + RUNNING = 1; + DONE = 2; + SUSPEND = 3; + FAILED = 4; + KILLED = 5; +} + +enum FileType { + PLAIN = 0; + LIB = 1; + ARCHIVE = 2; +} + +// Removed in REEF 0.3 in favor of explicit memory sizes. +// enum SIZE { +// SMALL = 0; +// MEDIUM = 1; +// LARGE = 2; +// XLARGE = 3; +//} + +enum ProcessType { + JVM = 0; + CLR = 1; +} + +message FileResourceProto { + required FileType type = 1; + required string name = 2; + required string path = 3; +} + +message RuntimeErrorProto { + required string name = 1; // e.g., local, yarn21 + required string message = 2; + optional bytes exception = 3; + + optional string identifier = 5; // e.g., evaluator id +} + +message JobStatusProto { + required string identifier = 1; + required State state = 2; + optional bytes message = 3; + optional bytes exception = 4; +} + +message ContextStatusProto { + enum State { + READY = 0; + DONE = 1; + FAIL = 2; + } + required State context_state = 1; + + required string context_id = 2; + optional string parent_id = 3; + + optional bytes error = 5; // when creating the context + + optional bool recovery = 6; + // Context messages + message ContextMessageProto { + required string source_id = 1; + required bytes message = 2; + } + repeated ContextMessageProto context_message = 7; +} + +message TaskStatusProto { + required string task_id = 1; + required string context_id = 2; + required State state = 3; + optional bytes result = 4; // e.g., return value from Task.call() + optional bool recovery = 5; + + // TaskMessageSource messages + message TaskMessageProto { + required string source_id = 1; + required bytes message = 2; + } + repeated TaskMessageProto task_message = 6; +} + +message EvaluatorStatusProto { + required string evaluator_id = 1; + required State state = 2; + optional bytes error = 3; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs new file mode 100644 index 0000000..54aca4c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Diagnostics; +using System.Globalization; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime +{ + public class MachineStatus + { + private static PerformanceCounter _cpuCounter; + + private static PerformanceCounter _ramCounter; + + private static PerformanceCounter _processCpuCounter; + + private static Process _process; + + private static bool _checkStatus; + + static MachineStatus() + { + _checkStatus = true; + _process = Process.GetCurrentProcess(); + string processName = _process.ProcessName; + + _cpuCounter = _cpuCounter ?? new PerformanceCounter() + { + CategoryName = "Processor", + CounterName = "% Processor Time", + InstanceName = "_Total", + }; + + _ramCounter = _ramCounter ?? new PerformanceCounter() + { + CategoryName = "Memory", + CounterName = "Available MBytes" + }; + + _processCpuCounter = _processCpuCounter ?? new PerformanceCounter() + { + CategoryName = "Process", + CounterName = "% Processor Time", + InstanceName = processName + }; + } + + public static string CurrentNodeCpuUsage + { + get + { + return _cpuCounter.NextValue() + "%"; + } + } + + public static string AvailableMemory + { + get + { + return _ramCounter.NextValue() + "MB"; + } + } + + public static string CurrentProcessMemoryUsage + { + get + { + return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; + } + } + + public static string PeakProcessMemoryUsage + { + get + { + return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; + } + } + + // this may not be accurate if there are multiple evaluator processes running on a single machine + public static string CurrentProcessCpuUsage + { + get + { + return ((float)_processCpuCounter.RawValue / 1000000.0) + "%"; + } + } + + public override string ToString() + { + string info = "No machine status information retrieved. Could be due to lack of admin right to get the info."; + if (_checkStatus) + { + try + { + _process.Refresh(); + info = string.Format( + CultureInfo.InvariantCulture, + "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]", + CurrentNodeCpuUsage, + AvailableMemory, + Environment.NewLine, + CurrentProcessCpuUsage, + CurrentProcessMemoryUsage, + PeakProcessMemoryUsage); + } + catch (Exception e) + { + _checkStatus = false; // It only takes one exception to switch the cheking off for good. + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus))); + // we do not want to crash the evealuator just because we cannot get the information. + info = "Cannot obtain machine status due to error " + e.Message; + } + } + + return info; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs new file mode 100644 index 0000000..97d705b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Evaluator +{ + public class Constants + { + public const string RootContextConfiguration = "RootContextConfiguration"; + + public const string EvaluatorIdentifier = "EvaluatorIdentifier"; + + public const string RootServiceConfiguration = "RootServiceConfiguration"; + + public const string TaskConfiguration = "TaskConfiguration"; + + public const string ContextIdentifier = "ContextIdentifier"; + + public const string ApplicationIdentifier = "ApplicationIdentifier"; + + public const int DefaultEvaluatorHeartbeatPeriodInMs = 4000; + + public const int DefaultEvaluatorHeartbeatMaxRetry = 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs new file mode 100644 index 0000000..217e24d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Evaluator; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Runtime.Event; +using System; +using System.Globalization; + +namespace Org.Apache.REEF.Common +{ + public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime)); + + private readonly string _evaluatorId; + + private readonly ContextManager _contextManager; + + private readonly HeartBeatManager _heartBeatManager; + + private readonly IRemoteManager<REEFMessage> _remoteManager; + + private readonly IClock _clock; + + private State _state = State.INIT; + + private IDisposable _evaluatorControlChannel; + + [Inject] + public EvaluatorRuntime( + ContextManager contextManager, + HeartBeatManager heartBeatManager) + { + using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) + { + _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; + _heartBeatManager = heartBeatManager; + _contextManager = contextManager; + _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; + _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; + + ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); + + // subscribe to driver proto message + driverObserver.Subscribe(o => OnNext(o.Message)); + + // register the driver observer + _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver); + + // start the hearbeat + _clock.ScheduleAlarm(0, heartBeatManager); + } + } + + public State State + { + get + { + return _state; + } + } + + public void Handle(EvaluatorControlProto message) + { + lock (_heartBeatManager) + { + LOGGER.Log(Level.Info, "Handle Evaluator control message"); + if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) + { + Handle(new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); + } + else if (_state != State.RUNNING) + { + Handle(new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state))); + } + else + { + if (message.context_control != null) + { + LOGGER.Log(Level.Info, "Send task control message to ContextManager"); + try + { + _contextManager.HandleTaskControl(message.context_control); + if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING) + { + LOGGER.Log(Level.Info, "Context stack is empty, done"); + _state = State.DONE; + _heartBeatManager.OnNext(GetEvaluatorStatus()); + _clock.Dispose(); + } + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Handle(e); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER); + } + } + if (message.kill_evaluator != null) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); + _state = State.KILLED; + _clock.Dispose(); + } + } + } + } + + public EvaluatorStatusProto GetEvaluatorStatus() + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); + EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + { + evaluator_id = _evaluatorId, + state = _state + }; + return evaluatorStatusProto; + } + + public void OnNext(RuntimeStart runtimeStart) + { + lock (_evaluatorId) + { + try + { + LOGGER.Log(Level.Info, "Runtime start"); + if (_state != State.INIT) + { + var e = new InvalidOperationException("State should be init."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _state = State.RUNNING; + _contextManager.Start(); + _heartBeatManager.OnNext(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Handle(e); + } + } + } + + void IObserver<RuntimeStart>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<REEFMessage>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<REEFMessage>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnNext(RuntimeStop runtimeStop) + { + LOGGER.Log(Level.Info, "Runtime stop"); + _contextManager.Dispose(); + + if (_state == State.RUNNING) + { + _state = State.DONE; + _heartBeatManager.OnNext(); + } + try + { + _evaluatorControlChannel.Dispose(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER); + } + LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete"); + } + + public void OnNext(REEFMessage value) + { + if (value != null && value.evaluatorControl != null) + { + LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); + Handle(value.evaluatorControl); + } + } + + private void Handle(Exception e) + { + lock (_heartBeatManager) + { + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); + _state = State.FAILED; + string errorMessage = string.Format( + CultureInfo.InvariantCulture, + "failed with error [{0}] with mesage [{1}] and stack trace [{2}]", + e, + e.Message, + e.StackTrace); + EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + { + evaluator_id = _evaluatorId, + error = ByteUtilities.StringToByteArrays(errorMessage), + state = _state + }; + _heartBeatManager.OnNext(evaluatorStatusProto); + _contextManager.Dispose(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs new file mode 100644 index 0000000..bc939d9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Evaluator.Context; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; +using System; + +namespace Org.Apache.REEF.Evaluator +{ + // TODO: merge with EvaluatorConfigurations class + public class EvaluatorSettings + { + private string _applicationId; + + private string _evaluatorId; + + private int _heartBeatPeriodInMs; + + private int _maxHeartbeatRetries; + + private ContextConfiguration _rootContextConfig; + + private IClock _clock; + + private IRemoteManager<REEFMessage> _remoteManager; + + private IInjector _injector; + + private EvaluatorOperationState _operationState; + + private INameClient _nameClient; + + public EvaluatorSettings( + string applicationId, + string evaluatorId, + int heartbeatPeriodInMs, + int maxHeartbeatRetries, + ContextConfiguration rootContextConfig, + IClock clock, + IRemoteManager<REEFMessage> remoteManager, + IInjector injecor) + { + if (string.IsNullOrWhiteSpace(evaluatorId)) + { + throw new ArgumentNullException("evaluatorId"); + } + if (rootContextConfig == null) + { + throw new ArgumentNullException("rootContextConfig"); + } + if (clock == null) + { + throw new ArgumentNullException("clock"); + } + if (remoteManager == null) + { + throw new ArgumentNullException("remoteManager"); + } + if (injecor == null) + { + throw new ArgumentNullException("injecor"); + } + _applicationId = applicationId; + _evaluatorId = evaluatorId; + _heartBeatPeriodInMs = heartbeatPeriodInMs; + _maxHeartbeatRetries = maxHeartbeatRetries; + _rootContextConfig = rootContextConfig; + _clock = clock; + _remoteManager = remoteManager; + _injector = injecor; + _operationState = EvaluatorOperationState.OPERATIONAL; + } + + public EvaluatorOperationState OperationState + { + get + { + return _operationState; + } + + set + { + _operationState = value; + } + } + + public string EvalutorId + { + get + { + return _evaluatorId; + } + } + + public int HeartBeatPeriodInMs + { + get + { + return _heartBeatPeriodInMs; + } + } + + public string ApplicationId + { + get + { + return _applicationId; + } + } + + public int MaxHeartbeatFailures + { + get + { + return _maxHeartbeatRetries; + } + } + + public ContextConfiguration RootContextConfig + { + get + { + return _rootContextConfig; + } + } + + public IClock RuntimeClock + { + get + { + return _clock; + } + } + + public INameClient NameClient + { + get + { + return _nameClient; + } + + set + { + _nameClient = value; + } + } + + public IRemoteManager<REEFMessage> RemoteManager + { + get + { + return _remoteManager; + } + } + + public IInjector Injector + { + get + { + return _injector; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs new file mode 100644 index 0000000..6d2121e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Common.Runtime; +using Org.Apache.REEF.Evaluator; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Threading; + +namespace Org.Apache.REEF.Common +{ + public class HeartBeatManager : IObserver<Alarm> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); + + private static readonly MachineStatus MachineStatus = new MachineStatus(); + + private readonly IRemoteManager<REEFMessage> _remoteManager; + + private readonly IClock _clock; + + private readonly int _heartBeatPeriodInMillSeconds; + + private readonly int _maxHeartbeatRetries = 0; + + private readonly string _evaluatorId; + + private IRemoteIdentifier _remoteId; + + private IObserver<REEFMessage> _observer; + + private int _heartbeatFailures = 0; + + private IDriverConnection _driverConnection; + + private EvaluatorSettings _evaluatorSettings; + + // the queue can only contains the following: + // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state + // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) + private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); + + public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) + { + using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) + { + _remoteManager = settings.RemoteManager; + _remoteId = remoteId; + _evaluatorId = settings.EvalutorId; + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + _clock = settings.RuntimeClock; + _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; + _maxHeartbeatRetries = settings.MaxHeartbeatFailures; + EvaluatorSettings = settings; + MachineStatus.ToString(); // kick start the CPU perf counter + } + } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public EvaluatorRuntime _evaluatorRuntime { get; set; } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public ContextManager _contextManager { get; set; } + + public EvaluatorSettings EvaluatorSettings + { + get + { + return _evaluatorSettings; + } + + private set + { + _evaluatorSettings = value; + } + } + + public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) + { + lock (_queuedHeartbeats) + { + if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto)); + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + return; + } + + // NOT during recovery, try to send + REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto); + try + { + _observer.OnNext(payload); + _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures + } + catch (Exception e) + { + if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); + } + + _heartbeatFailures++; + + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); + + if (_heartbeatFailures >= _maxHeartbeatRetries) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); + try + { + _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); + } + catch (Exception ex) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); + } + LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); + _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; + + // clean heartbeat failure + _heartbeatFailures = 0; + } + } + } + } + + /// <summary> + /// Assemble a complete new heartbeat and send it out. + /// </summary> + public void OnNext() + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific TaskStatus that must be delivered to the driver + /// </summary> + /// <param name="taskStatusProto"></param> + public void OnNext(TaskStatusProto taskStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + Optional<TaskStatusProto>.Of(taskStatusProto)); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific ContextStatusProto that must be delivered to the driver + /// </summary> + /// <param name="contextStatusProto"></param> + public void OnNext(ContextStatusProto contextStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); + List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); + contextStatusProtos.Add(contextStatusProto); + contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + contextStatusProtos, + Optional<TaskStatusProto>.Empty()); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific EvaluatorStatus that must be delivered to the driver + /// </summary> + /// <param name="evaluatorStatusProto"></param> + public void OnNext(EvaluatorStatusProto evaluatorStatusProto) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + public void OnNext(Alarm value) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); + if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); + Send(evaluatorHeartbeatProto); + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + else + { + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); + try + { + DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); + if (driverInformation == null) + { + LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + } + else + { + LOGGER.Log( + Level.Info, + string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); + Recover(driverInformation); + } + } + catch (Exception e) + { + // we do not want any exception to stop the query for driver status + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); + } + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + private static long CurrentTimeMilliSeconds() + { + // this is an implmenation to get current time milli second counted from Jan 1st, 1970 + // it is chose as such to be compatible with java implmentation + DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; + } + + private void Recover(DriverInformation driverInformation) + { + IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier); + _remoteId = new SocketRemoteIdentifier(driverEndpoint); + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + lock (_evaluatorSettings) + { + if (_evaluatorSettings.NameClient != null) + { + try + { + LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId); + _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId)); + LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + } + } + } + + lock (_queuedHeartbeats) + { + bool firstHeartbeatInQueue = true; + while (_queuedHeartbeats.Any()) + { + LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId); + try + { + if (firstHeartbeatInQueue) + { + // first heartbeat is specially construted to include the recovery flag + EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); + LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); + _observer.OnNext(new REEFMessage(recoveryHeartbeat)); + firstHeartbeatInQueue = false; + } + else + { + _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue())); + } + } + catch (Exception e) + { + // we do not handle failures during RECOVERY + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow( + e, + Level.Error, + string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId), + LOGGER); + } + Thread.Sleep(500); + } + } + _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; + LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); + } + + private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) + { + heartbeat.recovery = true; + heartbeat.context_status.ForEach(c => c.recovery = true); + heartbeat.task_status.recovery = true; + return heartbeat; + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() + { + return GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + _contextManager.GetTaskStatus()); + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( + EvaluatorStatusProto evaluatorStatusProto, + ICollection<ContextStatusProto> contextStatusProtos, + Optional<TaskStatusProto> taskStatusProto) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + foreach (ContextStatusProto contextStatusProto in contextStatusProtos) + { + evaluatorHeartbeatProto.context_status.Add(contextStatusProto); + } + if (taskStatusProto.IsPresent()) + { + evaluatorHeartbeatProto.task_status = taskStatusProto.Value; + } + return evaluatorHeartbeatProto; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs new file mode 100644 index 0000000..5593e08 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using System; +using System.Globalization; +using System.Threading; + +namespace Org.Apache.REEF.Common +{ + public class ReefMessageProtoObserver : + IObserver<IRemoteMessage<REEFMessage>>, + IObservable<IRemoteMessage<REEFMessage>>, + IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver)); + private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null; + private long _count = 0; + private DateTime _begin; + private DateTime _origBegin; + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(IRemoteMessage<REEFMessage> value) + { + REEFMessage remoteEvent = value.Message; + IRemoteIdentifier id = value.Identifier; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id)); + + if (remoteEvent.evaluatorControl != null) + { + if (remoteEvent.evaluatorControl.context_control != null) + { + string context_message = null; + string task_message = null; + + if (remoteEvent.evaluatorControl.context_control.context_message != null) + { + context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString(); + } + if (remoteEvent.evaluatorControl.context_control.task_message != null) + { + task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message); + } + + if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message))) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message)); + } + else if (remoteEvent.evaluatorControl.context_control.remove_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.add_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.start_task != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.stop_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to stop task"); + } + else if (remoteEvent.evaluatorControl.context_control.suspend_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to suspend task"); + } + } + } + if (_count == 0) + { + _begin = DateTime.Now; + _origBegin = _begin; + } + var count = Interlocked.Increment(ref _count); + + int printBatchSize = 100000; + if (count % printBatchSize == 0) + { + DateTime end = DateTime.Now; + var diff = (end - _begin).TotalMilliseconds; + double seconds = diff / 1000.0; + long eventsPerSecond = (long)(printBatchSize / seconds); + _begin = DateTime.Now; + } + + var observer = _observer; + if (observer != null) + { + observer.OnNext(value); + } + } + + public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer) + { + if (_observer != null) + { + return null; + } + _observer = observer; + return this; + } + + public void Dispose() + { + _observer = null; + } + } +}
