http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs new file mode 100644 index 0000000..e3a13b8 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities; + +namespace Org.Apache.Reef.Common +{ + public class CloseEventImpl : ICloseEvent + { + public CloseEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public CloseEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "CloseEvent{value=" + Value.ToString() + "}"; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs new file mode 100644 index 0000000..034df50 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities; + +namespace Org.Apache.Reef.Common.Runtime.Evaluator +{ + public class DriverMessageImpl : IDriverMessage + { + private Optional<byte[]> _value; + + public DriverMessageImpl() + { + _value = Optional<byte[]>.Empty(); + } + + public DriverMessageImpl(byte[] bytes) + { + _value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Message + { + get + { + return _value; + } + } + + public override string ToString() + { + return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs new file mode 100644 index 0000000..c75b09f --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities; + +namespace Org.Apache.Reef.Common +{ + public class SuspendEventImpl : ICloseEvent + { + public SuspendEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public SuspendEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "SuspendEvent{value=" + Value.ToString() + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs new file mode 100644 index 0000000..65b8be9 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.Reef.Tang.Interface; + +namespace Org.Apache.Reef.Common.Task +{ + public class TaskClientCodeException : Exception + { + private readonly string _taskId; + + private readonly string _contextId; + + /// <summary> + /// construct the exception that caused the Task to fail + /// </summary> + /// <param name="taskId"> the id of the failed task.</param> + /// <param name="contextId"> the id of the context the failed Task was executing in.</param> + /// <param name="message"> the error message </param> + /// <param name="cause"> the exception that caused the Task to fail.</param> + public TaskClientCodeException( + string taskId, + string contextId, + string message, + Exception cause) + : base(message, cause) + { + _taskId = taskId; + _contextId = contextId; + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public static string GetTaskIdentifier(IConfiguration c) + { + // TODO: update after TANG is available + return string.Empty; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs new file mode 100644 index 0000000..30acc2e --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Wake; +using System; +using System.Collections.Generic; + +namespace Org.Apache.Reef.Common +{ + public class TaskLifeCycle + { + private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers; + private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers; + private readonly ITaskStart _taskStart; + private readonly ITaskStop _taskStop; + + // INJECT + public TaskLifeCycle( + HashSet<IObserver<ITaskStop>> taskStopHandlers, + HashSet<IObserver<ITaskStart>> taskStartHandlers, + TaskStartImpl taskStart, + TaskStopImpl taskStop) + { + _taskStartHandlers = taskStartHandlers; + _taskStopHandlers = taskStopHandlers; + _taskStart = taskStart; + _taskStop = taskStop; + } + + public TaskLifeCycle() + { + _taskStartHandlers = new HashSet<IObserver<ITaskStart>>(); + _taskStopHandlers = new HashSet<IObserver<ITaskStop>>(); + } + + public void Start() + { + foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) + { + startHandler.OnNext(_taskStart); + } + } + + public void Stop() + { + foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers) + { + stopHandler.OnNext(_taskStop); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs new file mode 100644 index 0000000..05d6eec --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.io; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Common.Runtime.Evaluator; +using Org.Apache.Reef.Common.Task; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Exceptions; +using Org.Apache.Reef.Tang.Interface; +using System; +using System.Collections.Generic; +using System.Globalization; + +namespace Org.Apache.Reef.Common +{ + public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime)); + + private readonly ITask _task; + + private readonly IInjector _injector; + + // The memento given by the task configuration + private readonly Optional<byte[]> _memento; + + private readonly HeartBeatManager _heartBeatManager; + + private readonly TaskStatus _currentStatus; + + private readonly INameClient _nameClient; + + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) + { + _injector = taskInjector; + _heartBeatManager = heartBeatManager; + + Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); + try + { + _task = _injector.GetInstance<ITask>(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER); + } + try + { + ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); + messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource }); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER); + // do not rethrow since this is benign + } + try + { + _nameClient = _injector.GetInstance<INameClient>(); + _heartBeatManager.EvaluatorSettings.NameClient = _nameClient; + } + catch (InjectionException) + { + LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration."); + // do not rethrow since user is not required to provide name client + } + + LOGGER.Log(Level.Info, "task message source injected"); + _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); + _memento = memento == null ? + Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento)); + } + + public string TaskId + { + get { return _currentStatus.TaskId; } + } + + public string ContextId + { + get { return _currentStatus.ContextId; } + } + + public void Initialize() + { + _currentStatus.SetRunning(); + } + + /// <summary> + /// Run the task + /// </summary> + public void Start() + { + try + { + LOGGER.Log(Level.Info, "Call Task"); + if (_currentStatus.IsNotRunning()) + { + var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + byte[] result; + byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null; + System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento)); + try + { + runTask.Start(); + runTask.Wait(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER); + } + result = runTask.Result; + + LOGGER.Log(Level.Info, "Task Call Finished"); + if (_task != null) + { + _task.Dispose(); + } + _currentStatus.SetResult(result); + if (result != null && result.Length > 0) + { + LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + } + } + catch (Exception e) + { + if (_task != null) + { + _task.Dispose(); + } + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e)); + _currentStatus.SetException(e); + } + } + + public TaskState GetTaskState() + { + return _currentStatus.State; + } + + /// <summary> + /// Called by heartbeat manager + /// </summary> + /// <returns> current TaskStatusProto </returns> + public TaskStatusProto GetStatusProto() + { + return _currentStatus.ToProto(); + } + + public bool HasEnded() + { + return _currentStatus.HasEnded(); + } + + /// <summary> + /// get ID of the task. + /// </summary> + /// <returns>ID of the task.</returns> + public string GetActicityId() + { + return _currentStatus.TaskId; + } + + public void Close(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new CloseEventImpl(message)); + _currentStatus.SetCloseRequested(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER); + + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); + } + } + } + + public void Suspend(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); + + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new SuspendEventImpl(message)); + _currentStatus.SetSuspendRequested(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); + } + } + } + + public void Deliver(byte[] message) + { + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new DriverMessageImpl(message)); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); + } + } + } + + public void OnNext(ICloseEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); + // TODO: send a heartbeat + } + + void IObserver<ICloseEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ICloseEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnNext(ISuspendEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); + // TODO: send a heartbeat + } + + public void OnNext(IDriverMessage value) + { + IDriverMessageHandler messageHandler = null; + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); + try + { + messageHandler = _injector.GetInstance<IDriverMessageHandler>(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER); + } + if (messageHandler != null) + { + try + { + messageHandler.Handle(value); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER); + _currentStatus.RecordExecptionWithoutHeartbeat(e); + } + } + } + + private byte[] RunTask(byte[] memento) + { + return _task.Call(memento); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs new file mode 100644 index 0000000..c4047b8 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; + +namespace Org.Apache.Reef.Common +{ + public class TaskStartImpl : ITaskStart + { + //INJECT + public TaskStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs new file mode 100644 index 0000000..9e3bcb4 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Common +{ + public enum TaskState + { + Init = 0, + + Running = 1, + + CloseRequested = 2, + + SuspendRequested = 3, + + Suspended = 4, + + Failed = 5, + + Done = 6, + + Killed = 7 + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs new file mode 100644 index 0000000..639a7d0 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Globalization; + +namespace Org.Apache.Reef.Common +{ + public class TaskStatus + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); + private readonly TaskLifeCycle _taskLifeCycle; + private readonly HeartBeatManager _heartBeatManager; + private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; + + private string _taskId; + private string _contextId; + private Optional<Exception> _lastException = Optional<Exception>.Empty(); + private Optional<byte[]> _result = Optional<byte[]>.Empty(); + private TaskState _state; + + public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) + { + _contextId = contextId; + _taskId = taskId; + _heartBeatManager = heartBeatManager; + _taskLifeCycle = new TaskLifeCycle(); + _evaluatorMessageSources = evaluatorMessageSources; + State = TaskState.Init; + } + + public TaskState State + { + get + { + return _state; + } + + set + { + if (IsLegalStateTransition(_state, value)) + { + _state = value; + } + else + { + string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value); + LOGGER.Log(Level.Error, message); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER); + } + } + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public void SetException(Exception e) + { + RecordExecptionWithoutHeartbeat(e); + Heartbeat(); + _lastException = Optional<Exception>.Empty(); + } + + public void SetResult(byte[] result) + { + _result = Optional<byte[]>.OfNullable(result); + if (State == TaskState.Running) + { + State = TaskState.Done; + } + else if (State == TaskState.SuspendRequested) + { + State = TaskState.Suspended; + } + else if (State == TaskState.CloseRequested) + { + State = TaskState.Done; + } + _taskLifeCycle.Stop(); + Heartbeat(); + } + + public void SetRunning() + { + LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); + if (_state == TaskState.Init) + { + try + { + _taskLifeCycle.Start(); + // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event. + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat")); + Heartbeat(); + State = TaskState.Running; + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER); + SetException(e); + } + } + } + + public void SetCloseRequested() + { + State = TaskState.CloseRequested; + } + + public void SetSuspendRequested() + { + State = TaskState.SuspendRequested; + } + + public void SetKilled() + { + State = TaskState.Killed; + Heartbeat(); + } + + public bool IsNotRunning() + { + return _state != TaskState.Running; + } + + public bool HasEnded() + { + switch (_state) + { + case TaskState.Done: + case TaskState.Suspended: + case TaskState.Failed: + case TaskState.Killed: + return true; + default: + return false; + } + } + + public TaskStatusProto ToProto() + { + Check(); + TaskStatusProto taskStatusProto = new TaskStatusProto() + { + context_id = _contextId, + task_id = _taskId, + state = GetProtoState(), + }; + if (_result.IsPresent()) + { + taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value); + } + else if (_lastException.IsPresent()) + { + //final Encoder<Throwable> codec = new ObjectSerializableCodec<>(); + //final byte[] error = codec.encode(_lastException.get()); + byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()); + taskStatusProto.result = ByteUtilities.CopyBytesFrom(error); + } + else if (_state == TaskState.Running) + { + foreach (TaskMessage message in GetMessages()) + { + TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto() + { + source_id = message.MessageSourceId, + message = ByteUtilities.CopyBytesFrom(message.Message), + }; + taskStatusProto.task_message.Add(taskMessageProto); + } + } + return taskStatusProto; + } + + internal void RecordExecptionWithoutHeartbeat(Exception e) + { + if (!_lastException.IsPresent()) + { + _lastException = Optional<Exception>.Of(e); + } + State = TaskState.Failed; + _taskLifeCycle.Stop(); + } + + private static bool IsLegalStateTransition(TaskState? from, TaskState to) + { + if (from == null) + { + return to == TaskState.Init; + } + switch (from) + { + case TaskState.Init: + switch (to) + { + case TaskState.Init: + case TaskState.Running: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.Running: + switch (to) + { + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.CloseRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.SuspendRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Suspended: + return true; + default: + return false; + } + + case TaskState.Failed: + case TaskState.Done: + case TaskState.Killed: + default: + return true; + } + } + + private void Check() + { + if (_result.IsPresent() && _lastException.IsPresent()) + { + LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value)); + State = TaskState.Failed; + _result = Optional<byte[]>.Empty(); + } + } + + private void Heartbeat() + { + _heartBeatManager.OnNext(ToProto()); + } + + private State GetProtoState() + { + switch (_state) + { + case TaskState.Init: + return ProtoBuf.ReefServiceProto.State.INIT; + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Running: + return ProtoBuf.ReefServiceProto.State.RUNNING; + case TaskState.Done: + return ProtoBuf.ReefServiceProto.State.DONE; + case TaskState.Suspended: + return ProtoBuf.ReefServiceProto.State.SUSPEND; + case TaskState.Failed: + return ProtoBuf.ReefServiceProto.State.FAILED; + case TaskState.Killed: + return ProtoBuf.ReefServiceProto.State.KILLED; + default: + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER); + break; + } + return ProtoBuf.ReefServiceProto.State.FAILED; //this line should not be reached as default case will throw exception + } + + private ICollection<TaskMessage> GetMessages() + { + List<TaskMessage> result = new List<TaskMessage>(); + if (_evaluatorMessageSources.IsPresent()) + { + foreach (ITaskMessageSource source in _evaluatorMessageSources.Value) + { + Optional<TaskMessage> taskMessageOptional = source.Message; + if (taskMessageOptional.IsPresent()) + { + result.Add(taskMessageOptional.Value); + } + } + } + return result; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs new file mode 100644 index 0000000..2c7e75e --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Task; +using Org.Apache.Reef.Tasks.Events; + +namespace Org.Apache.Reef.Common +{ + public class TaskStopImpl : ITaskStop + { + //INJECT + public TaskStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs new file mode 100644 index 0000000..3154541 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Formats; +using System; +using System.IO; +using System.Linq; + +namespace Org.Apache.Reef.Common +{ + public class EvaluatorConfigurations + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations)); + + private AvroConfiguration _avroConfiguration; + + private string _configFile; + + private string _applicationId; + + private string _evaluatorId; + + private string _taskConfiguration; + + private string _rootContextConfiguration; + + private string _rootServiceConfiguration; + + public EvaluatorConfigurations(string configFile) + { + using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations")) + { + if (string.IsNullOrWhiteSpace(configFile)) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); + } + if (!File.Exists(configFile)) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); + } + _configFile = configFile; + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile); + } + } + + public string TaskConfiguration + { + get + { + _taskConfiguration = _taskConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.TaskConfiguration); + return _taskConfiguration; + } + } + + public string EvaluatorId + { + get + { + _evaluatorId = _evaluatorId ?? GetSettingValue(Reef.Evaluator.Constants.EvaluatorIdentifier); + return _evaluatorId; + } + } + + public string ApplicationId + { + get + { + _applicationId = _applicationId ?? GetSettingValue(Reef.Evaluator.Constants.ApplicationIdentifier); + return _applicationId; + } + } + + public string RootContextConfiguration + { + get + { + _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootContextConfiguration); + return _rootContextConfiguration; + } + } + + public string RootServiceConfiguration + { + get + { + _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootServiceConfiguration); + return _rootServiceConfiguration; + } + } + + private string GetSettingValue(string settingKey) + { + ConfigurationEntry configurationEntry = + _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase)); + if (configurationEntry == null) + { + return string.Empty; + } + + return configurationEntry.value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs new file mode 100644 index 0000000..4e68186 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Common +{ + public class RemoteManager + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs new file mode 100644 index 0000000..cbcebf8 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Services +{ + public interface IService + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs new file mode 100644 index 0000000..c51ef40 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Util; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] + +namespace Org.Apache.Reef.Services +{ + /// <summary> + /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration + /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along + /// to child context. + /// </summary> + public class ServiceConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references + /// will be made available to child context and tasks. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>(); + + public ServiceConfiguration() + : base() + { + } + + public ServiceConfiguration(string config) + { + TangConfig = new AvroConfigurationSerializer().FromString(config); + } + + public static ConfigurationModule ConfigurationModule + { + get + { + return new ServiceConfiguration() + .BindSetEntry(GenericType<ServicesSet>.Class, Services) + .Build(); + } + } + + public IConfiguration TangConfig { get; private set; } + } + + public class InjectedServices + { + [Inject] + public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services) + { + Services = services; + } + + public ISet<IService> Services { get; set; } + } + + [NamedParameter("Set of services", "servicesSet", "")] + class ServicesSet : Name<ISet<IService>> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs new file mode 100644 index 0000000..30a38de --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Services +{ + public class ServicesConfigurationOptions + { + [NamedParameter("Services", "services", "services")] + public class Services : Name<string> + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs new file mode 100644 index 0000000..a0111b8 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Defaults; +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Tasks +{ + //[DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface IDriverMessageHandler + { + void Handle(IDriverMessage message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs new file mode 100644 index 0000000..92a6887 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; +using System; + +namespace Org.Apache.Reef.Common.Task +{ + /// <summary> + /// Represents a running Task + /// </summary> + public interface IRunningTask : IIdentifiable, IDisposable + { + /// <summary> + /// Sends the message to the running task. + /// </summary> + /// <param name="message"></param> + void OnNext(byte[] message); + + /// <summary> + /// Signal the task to suspend. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Suspend(byte[] message); + + /// <summary> + /// Sends the message to the running task. + /// </summary> + void Suspend(); + + /// <summary> + /// Signal the task to shut down. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Dispose(byte[] message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs new file mode 100644 index 0000000..3655a4b --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Tasks +{ + public interface ITask : IDisposable + { + byte[] Call(byte[] memento); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs new file mode 100644 index 0000000..589a445 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Defaults; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Tasks +{ + [DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface ITaskMessageSource + { + Optional<TaskMessage> Message { get; set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs new file mode 100644 index 0000000..6fb386b --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Util; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static readonly field, typical usage in configurations")] + +namespace Org.Apache.Reef.Tasks +{ + public class TaskConfiguration : ConfigurationModuleBuilder + { + // this is a hack for getting the task identifier for now + public const string TaskIdentifier = "TaskConfigurationOptions+Identifier"; + + /// <summary> + /// The identifier of the task. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredParameter<string> Identifier = new RequiredParameter<string>(); + + /// <summary> + /// The task to instantiate. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredImpl<ITask> Task = new RequiredImpl<ITask>(); + + /// <summary> + /// for task suspension. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ISuspendEvent>> OnSuspend = new OptionalImpl<IObserver<ISuspendEvent>>(); + + /// <summary> + /// for messages from the driver. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IDriverMessageHandler> OnMessage = new OptionalImpl<IDriverMessageHandler>(); + + /// <summary> + /// for closure requests from the driver. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICloseEvent>> OnClose = new OptionalImpl<IObserver<ICloseEvent>>(); + + /// <summary> + /// Message source invoked upon each evaluator heartbeat. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<ITaskMessageSource> OnSendMessage = new OptionalImpl<ITaskMessageSource>(); + + /// <summary> + /// to receive TaskStart after the Task.call() method was called. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStart>> OnTaskStart = new OptionalImpl<IObserver<ITaskStart>>(); + + /// <summary> + /// to receive TaskStop after the Task.call() method returned. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStop>> OnTaskStop = new OptionalImpl<IObserver<ITaskStop>>(); + + /// <summary> + /// The memento to be passed to Task.call(). + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>(); + + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskConfiguration)); + + public TaskConfiguration() + : base() + { + } + + public TaskConfiguration(string configString) + { + TangConfig = new AvroConfigurationSerializer().FromString(configString); + AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); + foreach (ConfigurationEntry config in avroConfiguration.Bindings) + { + if (config.key.Contains(TaskIdentifier)) + { + TaskId = config.value; + } + } + if (string.IsNullOrWhiteSpace(TaskId)) + { + string msg = "Required parameter TaskId not provided."; + LOGGER.Log(Level.Error, msg); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); + } + } + + public static ConfigurationModule ConfigurationModule + { + get + { + return new TaskConfiguration() + .BindImplementation(GenericType<ITask>.Class, Task) + .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage) + .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento) + .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose) + .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend) + .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) + .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) + .Build(); + } + } + + public string TaskId { get; private set; } + + public IList<KeyValuePair<string, string>> Configurations { get; private set; } + + public IConfiguration TangConfig { get; private set; } + + public override string ToString() + { + return string.Format(CultureInfo.InvariantCulture, "TaskConfiguration - configurations: {0}", TangConfig.ToString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs new file mode 100644 index 0000000..888faf0 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Tang.Annotations; +using System; +using System.Collections.Generic; + +namespace Org.Apache.Reef.Tasks +{ + public class TaskConfigurationOptions + { + [NamedParameter("The Identifier of the Task", "taskid", "Task")] + public class Identifier : Name<string> + { + } + + [NamedParameter(documentation: "The memento to be used for the Task")] + public class Memento : Name<string> + { + } + + [NamedParameter("TaskMessageSource", "messagesource", null)] + public class TaskMessageSources : Name<ISet<ITaskMessageSource>> + { + } + + [NamedParameter(documentation: "The set of event handlers for the TaskStart event.")] + public class StartHandlers : Name<ISet<IObserver<ITaskStart>>> + { + } + + [NamedParameter(documentation: "The set of event handlers for the TaskStop event.")] + public class StopHandlers : Name<ISet<IObserver<ITaskStop>>> + { + } + + [NamedParameter(documentation: "The event handler that receives the close event.")] + public class CloseHandler : Name<IObserver<ICloseEvent>> + { + } + + [NamedParameter(documentation: "The event handler that receives the suspend event.")] + public class SuspendHandler : Name<IObserver<ISuspendEvent>> + { + } + + [NamedParameter(documentation: "The event handler that receives messages from the driver.")] + public class MessageHandler : Name<IObserver<IDriverMessage>> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs new file mode 100644 index 0000000..20defce --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; +using System; +using Org.Apache.Reef.Utilities.Logging; + +namespace Org.Apache.Reef.Tasks +{ + public class TaskMessage : IMessage + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskMessage)); + private readonly string _messageSourcId; + private readonly byte[] _bytes; + + private TaskMessage(string messageSourceId, byte[] bytes) + { + _messageSourcId = messageSourceId; + _bytes = bytes; + } + + public string MessageSourceId + { + get { return _messageSourcId; } + } + + public byte[] Message + { + get { return _bytes; } + set { } + } + + /// <summary> + /// From byte[] message to a TaskMessage + /// </summary> + /// <param name="messageSourceId">messageSourceId The message's sourceID. This will be accessible in the Driver for routing</param> + /// <param name="message">The actual content of the message, serialized into a byte[]</param> + /// <returns>a new TaskMessage with the given content</returns> + public static TaskMessage From(string messageSourceId, byte[] message) + { + if (string.IsNullOrEmpty(messageSourceId)) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("messageSourceId"), LOGGER); + } + if (message == null) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("bytes"), LOGGER); + } + return new TaskMessage(messageSourceId, message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs new file mode 100644 index 0000000..f77e18d --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tasks.Events; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Annotations; +using System; + +namespace Org.Apache.Reef.Tasks.Defaults +{ + public class DefaultDriverMessageHandler : IDriverMessageHandler + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler)); + + [Inject] + public DefaultDriverMessageHandler() + { + } + + public void Handle(IDriverMessage message) + { + Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs new file mode 100644 index 0000000..97b52db --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Tang.Annotations; +using System; +using System.Globalization; + +namespace Org.Apache.Reef.Tasks.Defaults +{ + public class DefaultTaskMessageSource : ITaskMessageSource + { + [Inject] + public DefaultTaskMessageSource() + { + } + + public Optional<TaskMessage> Message + { + get + { + TaskMessage defaultTaskMessage = TaskMessage.From( + "defaultSourceId", + ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture))); + return Optional<TaskMessage>.Of(defaultTaskMessage); + } + + set + { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs new file mode 100644 index 0000000..3ff9ccf --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; + +namespace Org.Apache.Reef.Tasks.Events +{ + public interface ICloseEvent + { + Optional<byte[]> Value { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs new file mode 100644 index 0000000..9ac120d --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; + +namespace Org.Apache.Reef.Tasks.Events +{ + public interface IDriverMessage + { + Optional<byte[]> Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs new file mode 100644 index 0000000..218a28f --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Tasks.Events +{ + public interface ISuspendEvent + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs new file mode 100644 index 0000000..bcde0b3 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Tasks.Events +{ + public interface ITaskStart + { + string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs new file mode 100644 index 0000000..5e8ebc8 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Tasks.Events +{ + public interface ITaskStop + { + string Id { get; } + } +}
