http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs new file mode 100644 index 0000000..d1a9f8d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using System.Threading; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + public class ReefMessageProtoObserver : + IObserver<IRemoteMessage<REEFMessage>>, + IObservable<IRemoteMessage<REEFMessage>>, + IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver)); + private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null; + private long _count = 0; + private DateTime _begin; + private DateTime _origBegin; + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(IRemoteMessage<REEFMessage> value) + { + REEFMessage remoteEvent = value.Message; + IRemoteIdentifier id = value.Identifier; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id)); + + if (remoteEvent.evaluatorControl != null) + { + if (remoteEvent.evaluatorControl.context_control != null) + { + string context_message = null; + string task_message = null; + + if (remoteEvent.evaluatorControl.context_control.context_message != null) + { + context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString(); + } + if (remoteEvent.evaluatorControl.context_control.task_message != null) + { + task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message); + } + + if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message))) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message)); + } + else if (remoteEvent.evaluatorControl.context_control.remove_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.add_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.start_task != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.stop_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to stop task"); + } + else if (remoteEvent.evaluatorControl.context_control.suspend_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to suspend task"); + } + } + } + if (_count == 0) + { + _begin = DateTime.Now; + _origBegin = _begin; + } + var count = Interlocked.Increment(ref _count); + + int printBatchSize = 100000; + if (count % printBatchSize == 0) + { + DateTime end = DateTime.Now; + var diff = (end - _begin).TotalMilliseconds; + double seconds = diff / 1000.0; + long eventsPerSecond = (long)(printBatchSize / seconds); + _begin = DateTime.Now; + } + + var observer = _observer; + if (observer != null) + { + observer.OnNext(value); + } + } + + public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer) + { + if (_observer != null) + { + return null; + } + _observer = observer; + return this; + } + + public void Dispose() + { + _observer = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs new file mode 100644 index 0000000..39bda77 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class CloseEventImpl : ICloseEvent + { + public CloseEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public CloseEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "CloseEvent{value=" + Value.ToString() + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs new file mode 100644 index 0000000..548e13a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class DriverMessageImpl : IDriverMessage + { + private Optional<byte[]> _value; + + public DriverMessageImpl() + { + _value = Optional<byte[]>.Empty(); + } + + public DriverMessageImpl(byte[] bytes) + { + _value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Message + { + get + { + return _value; + } + } + + public override string ToString() + { + return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs new file mode 100644 index 0000000..0a19106 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class SuspendEventImpl : ICloseEvent + { + public SuspendEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public SuspendEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "SuspendEvent{value=" + Value.ToString() + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs new file mode 100644 index 0000000..da83300 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskClientCodeException : Exception + { + private readonly string _taskId; + + private readonly string _contextId; + + /// <summary> + /// construct the exception that caused the Task to fail + /// </summary> + /// <param name="taskId"> the id of the failed task.</param> + /// <param name="contextId"> the id of the context the failed Task was executing in.</param> + /// <param name="message"> the error message </param> + /// <param name="cause"> the exception that caused the Task to fail.</param> + public TaskClientCodeException( + string taskId, + string contextId, + string message, + Exception cause) + : base(message, cause) + { + _taskId = taskId; + _contextId = contextId; + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public static string GetTaskIdentifier(IConfiguration c) + { + // TODO: update after TANG is available + return string.Empty; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs new file mode 100644 index 0000000..74fc320 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Common.Tasks.Events; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskLifeCycle + { + private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers; + private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers; + private readonly ITaskStart _taskStart; + private readonly ITaskStop _taskStop; + + // INJECT + public TaskLifeCycle( + HashSet<IObserver<ITaskStop>> taskStopHandlers, + HashSet<IObserver<ITaskStart>> taskStartHandlers, + TaskStartImpl taskStart, + TaskStopImpl taskStop) + { + _taskStartHandlers = taskStartHandlers; + _taskStopHandlers = taskStopHandlers; + _taskStart = taskStart; + _taskStop = taskStop; + } + + public TaskLifeCycle() + { + _taskStartHandlers = new HashSet<IObserver<ITaskStart>>(); + _taskStopHandlers = new HashSet<IObserver<ITaskStop>>(); + } + + public void Start() + { + foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) + { + startHandler.OnNext(_taskStart); + } + } + + public void Stop() + { + foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers) + { + stopHandler.OnNext(_taskStop); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs new file mode 100644 index 0000000..721adf7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime)); + + private readonly ITask _task; + + private readonly IInjector _injector; + + // The memento given by the task configuration + private readonly Optional<byte[]> _memento; + + private readonly HeartBeatManager _heartBeatManager; + + private readonly TaskStatus _currentStatus; + + private readonly INameClient _nameClient; + + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) + { + _injector = taskInjector; + _heartBeatManager = heartBeatManager; + + Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); + try + { + _task = _injector.GetInstance<ITask>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER); + } + try + { + ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); + messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource }); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER); + // do not rethrow since this is benign + } + try + { + _nameClient = _injector.GetInstance<INameClient>(); + _heartBeatManager.EvaluatorSettings.NameClient = _nameClient; + } + catch (InjectionException) + { + LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration."); + // do not rethrow since user is not required to provide name client + } + + LOGGER.Log(Level.Info, "task message source injected"); + _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); + _memento = memento == null ? + Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento)); + } + + public string TaskId + { + get { return _currentStatus.TaskId; } + } + + public string ContextId + { + get { return _currentStatus.ContextId; } + } + + public void Initialize() + { + _currentStatus.SetRunning(); + } + + /// <summary> + /// Run the task + /// </summary> + public void Start() + { + try + { + LOGGER.Log(Level.Info, "Call Task"); + if (_currentStatus.IsNotRunning()) + { + var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + byte[] result; + byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null; + System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento)); + try + { + runTask.Start(); + runTask.Wait(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER); + } + result = runTask.Result; + + LOGGER.Log(Level.Info, "Task Call Finished"); + if (_task != null) + { + _task.Dispose(); + } + _currentStatus.SetResult(result); + if (result != null && result.Length > 0) + { + LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + } + } + catch (Exception e) + { + if (_task != null) + { + _task.Dispose(); + } + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e)); + _currentStatus.SetException(e); + } + } + + public TaskState GetTaskState() + { + return _currentStatus.State; + } + + /// <summary> + /// Called by heartbeat manager + /// </summary> + /// <returns> current TaskStatusProto </returns> + public TaskStatusProto GetStatusProto() + { + return _currentStatus.ToProto(); + } + + public bool HasEnded() + { + return _currentStatus.HasEnded(); + } + + /// <summary> + /// get ID of the task. + /// </summary> + /// <returns>ID of the task.</returns> + public string GetActicityId() + { + return _currentStatus.TaskId; + } + + public void Close(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new CloseEventImpl(message)); + _currentStatus.SetCloseRequested(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER); + + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); + } + } + } + + public void Suspend(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); + + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new SuspendEventImpl(message)); + _currentStatus.SetSuspendRequested(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); + } + } + } + + public void Deliver(byte[] message) + { + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new DriverMessageImpl(message)); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); + } + } + } + + public void OnNext(ICloseEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); + // TODO: send a heartbeat + } + + void IObserver<ICloseEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ICloseEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnNext(ISuspendEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); + // TODO: send a heartbeat + } + + public void OnNext(IDriverMessage value) + { + IDriverMessageHandler messageHandler = null; + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); + try + { + messageHandler = _injector.GetInstance<IDriverMessageHandler>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER); + } + if (messageHandler != null) + { + try + { + messageHandler.Handle(value); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER); + _currentStatus.RecordExecptionWithoutHeartbeat(e); + } + } + } + + private byte[] RunTask(byte[] memento) + { + return _task.Call(memento); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs new file mode 100644 index 0000000..1c3c734 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskStartImpl : ITaskStart + { + //INJECT + public TaskStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs new file mode 100644 index 0000000..7829364 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public enum TaskState + { + Init = 0, + + Running = 1, + + CloseRequested = 2, + + SuspendRequested = 3, + + Suspended = 4, + + Failed = 5, + + Done = 6, + + Killed = 7 + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs new file mode 100644 index 0000000..d62ef39 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskStatus + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); + private readonly TaskLifeCycle _taskLifeCycle; + private readonly HeartBeatManager _heartBeatManager; + private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; + + private string _taskId; + private string _contextId; + private Optional<Exception> _lastException = Optional<Exception>.Empty(); + private Optional<byte[]> _result = Optional<byte[]>.Empty(); + private TaskState _state; + + public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) + { + _contextId = contextId; + _taskId = taskId; + _heartBeatManager = heartBeatManager; + _taskLifeCycle = new TaskLifeCycle(); + _evaluatorMessageSources = evaluatorMessageSources; + State = TaskState.Init; + } + + public TaskState State + { + get + { + return _state; + } + + set + { + if (IsLegalStateTransition(_state, value)) + { + _state = value; + } + else + { + string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value); + LOGGER.Log(Level.Error, message); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER); + } + } + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public void SetException(Exception e) + { + RecordExecptionWithoutHeartbeat(e); + Heartbeat(); + _lastException = Optional<Exception>.Empty(); + } + + public void SetResult(byte[] result) + { + _result = Optional<byte[]>.OfNullable(result); + if (State == TaskState.Running) + { + State = TaskState.Done; + } + else if (State == TaskState.SuspendRequested) + { + State = TaskState.Suspended; + } + else if (State == TaskState.CloseRequested) + { + State = TaskState.Done; + } + _taskLifeCycle.Stop(); + Heartbeat(); + } + + public void SetRunning() + { + LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); + if (_state == TaskState.Init) + { + try + { + _taskLifeCycle.Start(); + // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event. + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat")); + Heartbeat(); + State = TaskState.Running; + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER); + SetException(e); + } + } + } + + public void SetCloseRequested() + { + State = TaskState.CloseRequested; + } + + public void SetSuspendRequested() + { + State = TaskState.SuspendRequested; + } + + public void SetKilled() + { + State = TaskState.Killed; + Heartbeat(); + } + + public bool IsNotRunning() + { + return _state != TaskState.Running; + } + + public bool HasEnded() + { + switch (_state) + { + case TaskState.Done: + case TaskState.Suspended: + case TaskState.Failed: + case TaskState.Killed: + return true; + default: + return false; + } + } + + public TaskStatusProto ToProto() + { + Check(); + TaskStatusProto taskStatusProto = new TaskStatusProto() + { + context_id = _contextId, + task_id = _taskId, + state = GetProtoState(), + }; + if (_result.IsPresent()) + { + taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value); + } + else if (_lastException.IsPresent()) + { + //final Encoder<Throwable> codec = new ObjectSerializableCodec<>(); + //final byte[] error = codec.encode(_lastException.get()); + byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()); + taskStatusProto.result = ByteUtilities.CopyBytesFrom(error); + } + else if (_state == TaskState.Running) + { + foreach (TaskMessage message in GetMessages()) + { + TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto() + { + source_id = message.MessageSourceId, + message = ByteUtilities.CopyBytesFrom(message.Message), + }; + taskStatusProto.task_message.Add(taskMessageProto); + } + } + return taskStatusProto; + } + + internal void RecordExecptionWithoutHeartbeat(Exception e) + { + if (!_lastException.IsPresent()) + { + _lastException = Optional<Exception>.Of(e); + } + State = TaskState.Failed; + _taskLifeCycle.Stop(); + } + + private static bool IsLegalStateTransition(TaskState? from, TaskState to) + { + if (from == null) + { + return to == TaskState.Init; + } + switch (from) + { + case TaskState.Init: + switch (to) + { + case TaskState.Init: + case TaskState.Running: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.Running: + switch (to) + { + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.CloseRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.SuspendRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Suspended: + return true; + default: + return false; + } + + case TaskState.Failed: + case TaskState.Done: + case TaskState.Killed: + default: + return true; + } + } + + private void Check() + { + if (_result.IsPresent() && _lastException.IsPresent()) + { + LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value)); + State = TaskState.Failed; + _result = Optional<byte[]>.Empty(); + } + } + + private void Heartbeat() + { + _heartBeatManager.OnNext(ToProto()); + } + + private State GetProtoState() + { + switch (_state) + { + case TaskState.Init: + return Protobuf.ReefProtocol.State.INIT; + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Running: + return Protobuf.ReefProtocol.State.RUNNING; + case TaskState.Done: + return Protobuf.ReefProtocol.State.DONE; + case TaskState.Suspended: + return Protobuf.ReefProtocol.State.SUSPEND; + case TaskState.Failed: + return Protobuf.ReefProtocol.State.FAILED; + case TaskState.Killed: + return Protobuf.ReefProtocol.State.KILLED; + default: + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER); + break; + } + return Protobuf.ReefProtocol.State.FAILED; //this line should not be reached as default case will throw exception + } + + private ICollection<TaskMessage> GetMessages() + { + List<TaskMessage> result = new List<TaskMessage>(); + if (_evaluatorMessageSources.IsPresent()) + { + foreach (ITaskMessageSource source in _evaluatorMessageSources.Value) + { + Optional<TaskMessage> taskMessageOptional = source.Message; + if (taskMessageOptional.IsPresent()) + { + result.Add(taskMessageOptional.Value); + } + } + } + return result; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs new file mode 100644 index 0000000..8ad8a5b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + public class TaskStopImpl : ITaskStop + { + //INJECT + public TaskStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs new file mode 100644 index 0000000..66f726d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Linq; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Formats.AvroConfigurationDataContract; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils +{ + public class EvaluatorConfigurations + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations)); + + private AvroConfiguration _avroConfiguration; + + private string _configFile; + + private string _applicationId; + + private string _evaluatorId; + + private string _taskConfiguration; + + private string _rootContextConfiguration; + + private string _rootServiceConfiguration; + + public EvaluatorConfigurations(string configFile) + { + using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations")) + { + if (string.IsNullOrWhiteSpace(configFile)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); + } + if (!File.Exists(configFile)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); + } + _configFile = configFile; + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile); + } + } + + public string TaskConfiguration + { + get + { + _taskConfiguration = _taskConfiguration ?? GetSettingValue(Constants.TaskConfiguration); + return _taskConfiguration; + } + } + + public string EvaluatorId + { + get + { + _evaluatorId = _evaluatorId ?? GetSettingValue(Constants.EvaluatorIdentifier); + return _evaluatorId; + } + } + + public string ApplicationId + { + get + { + _applicationId = _applicationId ?? GetSettingValue(Constants.ApplicationIdentifier); + return _applicationId; + } + } + + public string RootContextConfiguration + { + get + { + _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Constants.RootContextConfiguration); + return _rootContextConfiguration; + } + } + + public string RootServiceConfiguration + { + get + { + _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Constants.RootServiceConfiguration); + return _rootServiceConfiguration; + } + } + + private string GetSettingValue(string settingKey) + { + ConfigurationEntry configurationEntry = + _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase)); + if (configurationEntry == null) + { + return string.Empty; + } + + return configurationEntry.value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs new file mode 100644 index 0000000..c129d04 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils +{ + public class RemoteManager + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs new file mode 100644 index 0000000..54aca4c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Diagnostics; +using System.Globalization; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime +{ + public class MachineStatus + { + private static PerformanceCounter _cpuCounter; + + private static PerformanceCounter _ramCounter; + + private static PerformanceCounter _processCpuCounter; + + private static Process _process; + + private static bool _checkStatus; + + static MachineStatus() + { + _checkStatus = true; + _process = Process.GetCurrentProcess(); + string processName = _process.ProcessName; + + _cpuCounter = _cpuCounter ?? new PerformanceCounter() + { + CategoryName = "Processor", + CounterName = "% Processor Time", + InstanceName = "_Total", + }; + + _ramCounter = _ramCounter ?? new PerformanceCounter() + { + CategoryName = "Memory", + CounterName = "Available MBytes" + }; + + _processCpuCounter = _processCpuCounter ?? new PerformanceCounter() + { + CategoryName = "Process", + CounterName = "% Processor Time", + InstanceName = processName + }; + } + + public static string CurrentNodeCpuUsage + { + get + { + return _cpuCounter.NextValue() + "%"; + } + } + + public static string AvailableMemory + { + get + { + return _ramCounter.NextValue() + "MB"; + } + } + + public static string CurrentProcessMemoryUsage + { + get + { + return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; + } + } + + public static string PeakProcessMemoryUsage + { + get + { + return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; + } + } + + // this may not be accurate if there are multiple evaluator processes running on a single machine + public static string CurrentProcessCpuUsage + { + get + { + return ((float)_processCpuCounter.RawValue / 1000000.0) + "%"; + } + } + + public override string ToString() + { + string info = "No machine status information retrieved. Could be due to lack of admin right to get the info."; + if (_checkStatus) + { + try + { + _process.Refresh(); + info = string.Format( + CultureInfo.InvariantCulture, + "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]", + CurrentNodeCpuUsage, + AvailableMemory, + Environment.NewLine, + CurrentProcessCpuUsage, + CurrentProcessMemoryUsage, + PeakProcessMemoryUsage); + } + catch (Exception e) + { + _checkStatus = false; // It only takes one exception to switch the cheking off for good. + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus))); + // we do not want to crash the evealuator just because we cannot get the information. + info = "Cannot obtain machine status due to error " + e.Message; + } + } + + return info; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/IService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Services/IService.cs b/lang/cs/Org.Apache.REEF.Common/Services/IService.cs new file mode 100644 index 0000000..fa5b5d7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Services/IService.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Services +{ + public interface IService + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs new file mode 100644 index 0000000..af5070b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] + +namespace Org.Apache.REEF.Common.Services +{ + /// <summary> + /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration + /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along + /// to child context. + /// </summary> + public class ServiceConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references + /// will be made available to child context and tasks. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>(); + + public ServiceConfiguration() + : base() + { + } + + public ServiceConfiguration(string config) + { + TangConfig = new AvroConfigurationSerializer().FromString(config); + } + + public static ConfigurationModule ConfigurationModule + { + get + { + return new ServiceConfiguration() + .BindSetEntry(GenericType<ServicesSet>.Class, Services) + .Build(); + } + } + + public IConfiguration TangConfig { get; private set; } + } + + public class InjectedServices + { + [Inject] + public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services) + { + Services = services; + } + + public ISet<IService> Services { get; set; } + } + + [NamedParameter("Set of services", "servicesSet", "")] + class ServicesSet : Name<ISet<IService>> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs new file mode 100644 index 0000000..f4afd7b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Services +{ + public class ServicesConfigurationOptions + { + [NamedParameter("Services", "services", "services")] + public class Services : Name<string> + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs new file mode 100644 index 0000000..5e34136 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Tasks.Defaults +{ + public class DefaultDriverMessageHandler : IDriverMessageHandler + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler)); + + [Inject] + public DefaultDriverMessageHandler() + { + } + + public void Handle(IDriverMessage message) + { + Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs new file mode 100644 index 0000000..2929a59 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Tasks.Defaults +{ + public class DefaultTaskMessageSource : ITaskMessageSource + { + [Inject] + public DefaultTaskMessageSource() + { + } + + public Optional<TaskMessage> Message + { + get + { + TaskMessage defaultTaskMessage = TaskMessage.From( + "defaultSourceId", + ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture))); + return Optional<TaskMessage>.Of(defaultTaskMessage); + } + + set + { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs new file mode 100644 index 0000000..b26f255 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Tasks.Events +{ + public interface ICloseEvent + { + Optional<byte[]> Value { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs new file mode 100644 index 0000000..15a706e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Tasks.Events +{ + public interface IDriverMessage + { + Optional<byte[]> Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs new file mode 100644 index 0000000..36356b7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks.Events +{ + public interface ISuspendEvent + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs new file mode 100644 index 0000000..aeb376d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks.Events +{ + public interface ITaskStart + { + string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs new file mode 100644 index 0000000..b80b38b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common.Tasks.Events +{ + public interface ITaskStop + { + string Id { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs new file mode 100644 index 0000000..0b8e7e7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Events; + +namespace Org.Apache.REEF.Common.Tasks +{ + //[DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface IDriverMessageHandler + { + void Handle(IDriverMessage message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs new file mode 100644 index 0000000..236da5a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Tasks +{ + /// <summary> + /// Represents a running Task + /// </summary> + public interface IRunningTask : IIdentifiable, IDisposable + { + /// <summary> + /// Sends the message to the running task. + /// </summary> + /// <param name="message"></param> + void OnNext(byte[] message); + + /// <summary> + /// Signal the task to suspend. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Suspend(byte[] message); + + /// <summary> + /// Sends the message to the running task. + /// </summary> + void Suspend(); + + /// <summary> + /// Signal the task to shut down. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Dispose(byte[] message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs new file mode 100644 index 0000000..37a1d46 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Common.Tasks +{ + public interface ITask : IDisposable + { + byte[] Call(byte[] memento); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs new file mode 100644 index 0000000..d9830c4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Tasks.Defaults; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Tasks +{ + [DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface ITaskMessageSource + { + Optional<TaskMessage> Message { get; set; } + } +} \ No newline at end of file
