http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs new file mode 100644 index 0000000..258bc24 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common +{ + public enum TaskState + { + Init = 0, + + Running = 1, + + CloseRequested = 2, + + SuspendRequested = 3, + + Suspended = 4, + + Failed = 5, + + Done = 6, + + Killed = 7 + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs new file mode 100644 index 0000000..ba00262 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Globalization; + +namespace Org.Apache.REEF.Common +{ + public class TaskStatus + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); + private readonly TaskLifeCycle _taskLifeCycle; + private readonly HeartBeatManager _heartBeatManager; + private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; + + private string _taskId; + private string _contextId; + private Optional<Exception> _lastException = Optional<Exception>.Empty(); + private Optional<byte[]> _result = Optional<byte[]>.Empty(); + private TaskState _state; + + public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) + { + _contextId = contextId; + _taskId = taskId; + _heartBeatManager = heartBeatManager; + _taskLifeCycle = new TaskLifeCycle(); + _evaluatorMessageSources = evaluatorMessageSources; + State = TaskState.Init; + } + + public TaskState State + { + get + { + return _state; + } + + set + { + if (IsLegalStateTransition(_state, value)) + { + _state = value; + } + else + { + string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value); + LOGGER.Log(Level.Error, message); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER); + } + } + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public void SetException(Exception e) + { + RecordExecptionWithoutHeartbeat(e); + Heartbeat(); + _lastException = Optional<Exception>.Empty(); + } + + public void SetResult(byte[] result) + { + _result = Optional<byte[]>.OfNullable(result); + if (State == TaskState.Running) + { + State = TaskState.Done; + } + else if (State == TaskState.SuspendRequested) + { + State = TaskState.Suspended; + } + else if (State == TaskState.CloseRequested) + { + State = TaskState.Done; + } + _taskLifeCycle.Stop(); + Heartbeat(); + } + + public void SetRunning() + { + LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); + if (_state == TaskState.Init) + { + try + { + _taskLifeCycle.Start(); + // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event. + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat")); + Heartbeat(); + State = TaskState.Running; + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER); + SetException(e); + } + } + } + + public void SetCloseRequested() + { + State = TaskState.CloseRequested; + } + + public void SetSuspendRequested() + { + State = TaskState.SuspendRequested; + } + + public void SetKilled() + { + State = TaskState.Killed; + Heartbeat(); + } + + public bool IsNotRunning() + { + return _state != TaskState.Running; + } + + public bool HasEnded() + { + switch (_state) + { + case TaskState.Done: + case TaskState.Suspended: + case TaskState.Failed: + case TaskState.Killed: + return true; + default: + return false; + } + } + + public TaskStatusProto ToProto() + { + Check(); + TaskStatusProto taskStatusProto = new TaskStatusProto() + { + context_id = _contextId, + task_id = _taskId, + state = GetProtoState(), + }; + if (_result.IsPresent()) + { + taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value); + } + else if (_lastException.IsPresent()) + { + //final Encoder<Throwable> codec = new ObjectSerializableCodec<>(); + //final byte[] error = codec.encode(_lastException.get()); + byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()); + taskStatusProto.result = ByteUtilities.CopyBytesFrom(error); + } + else if (_state == TaskState.Running) + { + foreach (TaskMessage message in GetMessages()) + { + TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto() + { + source_id = message.MessageSourceId, + message = ByteUtilities.CopyBytesFrom(message.Message), + }; + taskStatusProto.task_message.Add(taskMessageProto); + } + } + return taskStatusProto; + } + + internal void RecordExecptionWithoutHeartbeat(Exception e) + { + if (!_lastException.IsPresent()) + { + _lastException = Optional<Exception>.Of(e); + } + State = TaskState.Failed; + _taskLifeCycle.Stop(); + } + + private static bool IsLegalStateTransition(TaskState? from, TaskState to) + { + if (from == null) + { + return to == TaskState.Init; + } + switch (from) + { + case TaskState.Init: + switch (to) + { + case TaskState.Init: + case TaskState.Running: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.Running: + switch (to) + { + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.CloseRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Done: + return true; + default: + return false; + } + case TaskState.SuspendRequested: + switch (to) + { + case TaskState.Failed: + case TaskState.Killed: + case TaskState.Suspended: + return true; + default: + return false; + } + + case TaskState.Failed: + case TaskState.Done: + case TaskState.Killed: + default: + return true; + } + } + + private void Check() + { + if (_result.IsPresent() && _lastException.IsPresent()) + { + LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value)); + State = TaskState.Failed; + _result = Optional<byte[]>.Empty(); + } + } + + private void Heartbeat() + { + _heartBeatManager.OnNext(ToProto()); + } + + private State GetProtoState() + { + switch (_state) + { + case TaskState.Init: + return ProtoBuf.ReefServiceProto.State.INIT; + case TaskState.CloseRequested: + case TaskState.SuspendRequested: + case TaskState.Running: + return ProtoBuf.ReefServiceProto.State.RUNNING; + case TaskState.Done: + return ProtoBuf.ReefServiceProto.State.DONE; + case TaskState.Suspended: + return ProtoBuf.ReefServiceProto.State.SUSPEND; + case TaskState.Failed: + return ProtoBuf.ReefServiceProto.State.FAILED; + case TaskState.Killed: + return ProtoBuf.ReefServiceProto.State.KILLED; + default: + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER); + break; + } + return ProtoBuf.ReefServiceProto.State.FAILED; //this line should not be reached as default case will throw exception + } + + private ICollection<TaskMessage> GetMessages() + { + List<TaskMessage> result = new List<TaskMessage>(); + if (_evaluatorMessageSources.IsPresent()) + { + foreach (ITaskMessageSource source in _evaluatorMessageSources.Value) + { + Optional<TaskMessage> taskMessageOptional = source.Message; + if (taskMessageOptional.IsPresent()) + { + result.Add(taskMessageOptional.Value); + } + } + } + return result; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs new file mode 100644 index 0000000..397411b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Task; +using Org.Apache.REEF.Tasks.Events; + +namespace Org.Apache.REEF.Common +{ + public class TaskStopImpl : ITaskStop + { + //INJECT + public TaskStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs new file mode 100644 index 0000000..0125128 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using System; +using System.IO; +using System.Linq; + +namespace Org.Apache.REEF.Common +{ + public class EvaluatorConfigurations + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations)); + + private AvroConfiguration _avroConfiguration; + + private string _configFile; + + private string _applicationId; + + private string _evaluatorId; + + private string _taskConfiguration; + + private string _rootContextConfiguration; + + private string _rootServiceConfiguration; + + public EvaluatorConfigurations(string configFile) + { + using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations")) + { + if (string.IsNullOrWhiteSpace(configFile)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); + } + if (!File.Exists(configFile)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); + } + _configFile = configFile; + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile); + } + } + + public string TaskConfiguration + { + get + { + _taskConfiguration = _taskConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.TaskConfiguration); + return _taskConfiguration; + } + } + + public string EvaluatorId + { + get + { + _evaluatorId = _evaluatorId ?? GetSettingValue(REEF.Evaluator.Constants.EvaluatorIdentifier); + return _evaluatorId; + } + } + + public string ApplicationId + { + get + { + _applicationId = _applicationId ?? GetSettingValue(REEF.Evaluator.Constants.ApplicationIdentifier); + return _applicationId; + } + } + + public string RootContextConfiguration + { + get + { + _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.RootContextConfiguration); + return _rootContextConfiguration; + } + } + + public string RootServiceConfiguration + { + get + { + _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.RootServiceConfiguration); + return _rootServiceConfiguration; + } + } + + private string GetSettingValue(string settingKey) + { + ConfigurationEntry configurationEntry = + _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase)); + if (configurationEntry == null) + { + return string.Empty; + } + + return configurationEntry.value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs new file mode 100644 index 0000000..c9bcb56 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Common +{ + public class RemoteManager + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/services/IService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/IService.cs b/lang/cs/Org.Apache.REEF.Common/services/IService.cs new file mode 100644 index 0000000..016291c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/services/IService.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Services +{ + public interface IService + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs new file mode 100644 index 0000000..5331709 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] + +namespace Org.Apache.REEF.Services +{ + /// <summary> + /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration + /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along + /// to child context. + /// </summary> + public class ServiceConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references + /// will be made available to child context and tasks. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>(); + + public ServiceConfiguration() + : base() + { + } + + public ServiceConfiguration(string config) + { + TangConfig = new AvroConfigurationSerializer().FromString(config); + } + + public static ConfigurationModule ConfigurationModule + { + get + { + return new ServiceConfiguration() + .BindSetEntry(GenericType<ServicesSet>.Class, Services) + .Build(); + } + } + + public IConfiguration TangConfig { get; private set; } + } + + public class InjectedServices + { + [Inject] + public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services) + { + Services = services; + } + + public ISet<IService> Services { get; set; } + } + + [NamedParameter("Set of services", "servicesSet", "")] + class ServicesSet : Name<ISet<IService>> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs new file mode 100644 index 0000000..31a206a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Services +{ + public class ServicesConfigurationOptions + { + [NamedParameter("Services", "services", "services")] + public class Services : Name<string> + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs new file mode 100644 index 0000000..4c5d42f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Defaults; +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tasks +{ + //[DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface IDriverMessageHandler + { + void Handle(IDriverMessage message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/IRunningTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/IRunningTask.cs b/lang/cs/Org.Apache.REEF.Common/tasks/IRunningTask.cs new file mode 100644 index 0000000..1e76d1d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/IRunningTask.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; +using System; + +namespace Org.Apache.REEF.Common.Task +{ + /// <summary> + /// Represents a running Task + /// </summary> + public interface IRunningTask : IIdentifiable, IDisposable + { + /// <summary> + /// Sends the message to the running task. + /// </summary> + /// <param name="message"></param> + void OnNext(byte[] message); + + /// <summary> + /// Signal the task to suspend. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Suspend(byte[] message); + + /// <summary> + /// Sends the message to the running task. + /// </summary> + void Suspend(); + + /// <summary> + /// Signal the task to shut down. + /// </summary> + /// <param name="message">a message that is sent to the Task.</param> + void Dispose(byte[] message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/ITask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/ITask.cs b/lang/cs/Org.Apache.REEF.Common/tasks/ITask.cs new file mode 100644 index 0000000..42cc10e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/ITask.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Tasks +{ + public interface ITask : IDisposable + { + byte[] Call(byte[] memento); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/ITaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/ITaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/tasks/ITaskMessageSource.cs new file mode 100644 index 0000000..141ba89 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/ITaskMessageSource.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Defaults; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tasks +{ + [DefaultImplementation(typeof(DefaultTaskMessageSource))] + public interface ITaskMessageSource + { + Optional<TaskMessage> Message { get; set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfiguration.cs new file mode 100644 index 0000000..cd82d2a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfiguration.cs @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static readonly field, typical usage in configurations")] + +namespace Org.Apache.REEF.Tasks +{ + public class TaskConfiguration : ConfigurationModuleBuilder + { + // this is a hack for getting the task identifier for now + public const string TaskIdentifier = "TaskConfigurationOptions+Identifier"; + + /// <summary> + /// The identifier of the task. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredParameter<string> Identifier = new RequiredParameter<string>(); + + /// <summary> + /// The task to instantiate. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredImpl<ITask> Task = new RequiredImpl<ITask>(); + + /// <summary> + /// for task suspension. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ISuspendEvent>> OnSuspend = new OptionalImpl<IObserver<ISuspendEvent>>(); + + /// <summary> + /// for messages from the driver. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IDriverMessageHandler> OnMessage = new OptionalImpl<IDriverMessageHandler>(); + + /// <summary> + /// for closure requests from the driver. Defaults to task failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICloseEvent>> OnClose = new OptionalImpl<IObserver<ICloseEvent>>(); + + /// <summary> + /// Message source invoked upon each evaluator heartbeat. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<ITaskMessageSource> OnSendMessage = new OptionalImpl<ITaskMessageSource>(); + + /// <summary> + /// to receive TaskStart after the Task.call() method was called. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStart>> OnTaskStart = new OptionalImpl<IObserver<ITaskStart>>(); + + /// <summary> + /// to receive TaskStop after the Task.call() method returned. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStop>> OnTaskStop = new OptionalImpl<IObserver<ITaskStop>>(); + + /// <summary> + /// The memento to be passed to Task.call(). + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>(); + + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskConfiguration)); + + public TaskConfiguration() + : base() + { + } + + public TaskConfiguration(string configString) + { + TangConfig = new AvroConfigurationSerializer().FromString(configString); + AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); + foreach (ConfigurationEntry config in avroConfiguration.Bindings) + { + if (config.key.Contains(TaskIdentifier)) + { + TaskId = config.value; + } + } + if (string.IsNullOrWhiteSpace(TaskId)) + { + string msg = "Required parameter TaskId not provided."; + LOGGER.Log(Level.Error, msg); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); + } + } + + public static ConfigurationModule ConfigurationModule + { + get + { + return new TaskConfiguration() + .BindImplementation(GenericType<ITask>.Class, Task) + .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage) + .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento) + .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose) + .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend) + .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) + .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) + .Build(); + } + } + + public string TaskId { get; private set; } + + public IList<KeyValuePair<string, string>> Configurations { get; private set; } + + public IConfiguration TangConfig { get; private set; } + + public override string ToString() + { + return string.Format(CultureInfo.InvariantCulture, "TaskConfiguration - configurations: {0}", TangConfig.ToString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfigurationOptions.cs new file mode 100644 index 0000000..db50d78 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/TaskConfigurationOptions.cs @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Tasks +{ + public class TaskConfigurationOptions + { + [NamedParameter("The Identifier of the Task", "taskid", "Task")] + public class Identifier : Name<string> + { + } + + [NamedParameter(documentation: "The memento to be used for the Task")] + public class Memento : Name<string> + { + } + + [NamedParameter("TaskMessageSource", "messagesource", null)] + public class TaskMessageSources : Name<ISet<ITaskMessageSource>> + { + } + + [NamedParameter(documentation: "The set of event handlers for the TaskStart event.")] + public class StartHandlers : Name<ISet<IObserver<ITaskStart>>> + { + } + + [NamedParameter(documentation: "The set of event handlers for the TaskStop event.")] + public class StopHandlers : Name<ISet<IObserver<ITaskStop>>> + { + } + + [NamedParameter(documentation: "The event handler that receives the close event.")] + public class CloseHandler : Name<IObserver<ICloseEvent>> + { + } + + [NamedParameter(documentation: "The event handler that receives the suspend event.")] + public class SuspendHandler : Name<IObserver<ISuspendEvent>> + { + } + + [NamedParameter(documentation: "The event handler that receives messages from the driver.")] + public class MessageHandler : Name<IObserver<IDriverMessage>> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/TaskMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/TaskMessage.cs b/lang/cs/Org.Apache.REEF.Common/tasks/TaskMessage.cs new file mode 100644 index 0000000..bc96016 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/TaskMessage.cs @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tasks +{ + public class TaskMessage : IMessage + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskMessage)); + private readonly string _messageSourcId; + private readonly byte[] _bytes; + + private TaskMessage(string messageSourceId, byte[] bytes) + { + _messageSourcId = messageSourceId; + _bytes = bytes; + } + + public string MessageSourceId + { + get { return _messageSourcId; } + } + + public byte[] Message + { + get { return _bytes; } + set { } + } + + /// <summary> + /// From byte[] message to a TaskMessage + /// </summary> + /// <param name="messageSourceId">messageSourceId The message's sourceID. This will be accessible in the Driver for routing</param> + /// <param name="message">The actual content of the message, serialized into a byte[]</param> + /// <returns>a new TaskMessage with the given content</returns> + public static TaskMessage From(string messageSourceId, byte[] message) + { + if (string.IsNullOrEmpty(messageSourceId)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("messageSourceId"), LOGGER); + } + if (message == null) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("bytes"), LOGGER); + } + return new TaskMessage(messageSourceId, message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultDriverMessageHandler.cs new file mode 100644 index 0000000..3749f2b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultDriverMessageHandler.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using System; + +namespace Org.Apache.REEF.Tasks.Defaults +{ + public class DefaultDriverMessageHandler : IDriverMessageHandler + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler)); + + [Inject] + public DefaultDriverMessageHandler() + { + } + + public void Handle(IDriverMessage message) + { + Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultTaskMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultTaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultTaskMessageSource.cs new file mode 100644 index 0000000..4549ab5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/defaults/DefaultTaskMessageSource.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Globalization; + +namespace Org.Apache.REEF.Tasks.Defaults +{ + public class DefaultTaskMessageSource : ITaskMessageSource + { + [Inject] + public DefaultTaskMessageSource() + { + } + + public Optional<TaskMessage> Message + { + get + { + TaskMessage defaultTaskMessage = TaskMessage.From( + "defaultSourceId", + ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture))); + return Optional<TaskMessage>.Of(defaultTaskMessage); + } + + set + { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/events/ICloseEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/events/ICloseEvent.cs b/lang/cs/Org.Apache.REEF.Common/tasks/events/ICloseEvent.cs new file mode 100644 index 0000000..e9737c3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/events/ICloseEvent.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Tasks.Events +{ + public interface ICloseEvent + { + Optional<byte[]> Value { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/events/IDriverMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/events/IDriverMessage.cs b/lang/cs/Org.Apache.REEF.Common/tasks/events/IDriverMessage.cs new file mode 100644 index 0000000..a1ead6d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/events/IDriverMessage.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Tasks.Events +{ + public interface IDriverMessage + { + Optional<byte[]> Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/events/ISuspendEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/events/ISuspendEvent.cs b/lang/cs/Org.Apache.REEF.Common/tasks/events/ISuspendEvent.cs new file mode 100644 index 0000000..1926c75 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/events/ISuspendEvent.cs @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Tasks.Events +{ + public interface ISuspendEvent + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStart.cs b/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStart.cs new file mode 100644 index 0000000..7b15609 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStart.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Tasks.Events +{ + public interface ITaskStart + { + string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStop.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStop.cs b/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStop.cs new file mode 100644 index 0000000..62e9254 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/tasks/events/ITaskStop.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Tasks.Events +{ + public interface ITaskStop + { + string Id { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs b/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs new file mode 100644 index 0000000..ebb56b5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.ProtoBuf.ClienRuntimeProto; +using System; + +// TODO +namespace Org.Apache.REEF.Driver +{ + public class ClientManager : IObserver<JobControlProto> + { + public void OnNext(JobControlProto value) + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/Constants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Constants.cs b/lang/cs/Org.Apache.REEF.Driver/Constants.cs new file mode 100644 index 0000000..efbe999 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Constants.cs @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; + +namespace Org.Apache.REEF.Driver +{ + public class Constants + { + public const ulong NullHandler = 0; + + public const string ClassHierarachyBin = "clrClassHierarchy.bin"; + + public const string GlobalUserSuppliedJavaLibraries = "userSuppliedGlobalLibraries.txt"; + + public const int DefaultMemoryGranularity = 1024; + + public const int HandlersNumber = 17; + + public const string EvaluatorRequestorHandler = "EvaluatorRequestor"; + + public const string AllocatedEvaluatorHandler = "AllocatedEvaluator"; + + public const string CompletedEvaluatorHandler = "CompletedEvaluator"; + + public const string ActiveContextHandler = "ActiveContext"; + + public const string ClosedContextHandler = "ClosedContext"; + + public const string FailedContextHandler = "FailedContext"; + + public const string ContextMessageHandler = "ContextMessage"; + + public const string TaskMessageHandler = "TaskMessage"; + + public const string FailedTaskHandler = "FailedTask"; + + public const string RunningTaskHandler = "RunningTask"; + + public const string FailedEvaluatorHandler = "FailedEvaluator"; + + public const string CompletedTaskHandler = "CompletedTask"; + + public const string SuspendedTaskHandler = "SuspendedTask"; + + public const string HttpServerHandler = "HttpServerHandler"; + + public const string DriverRestartHandler = "DriverRestart"; + + public const string DriverRestartActiveContextHandler = "DriverRestartActiveContext"; + + public const string DriverRestartRunningTaskHandler = "DriverRestartRunningTask"; + + public const string DriverBridgeConfiguration = Common.Constants.ClrBridgeRuntimeConfiguration; + + public const string DriverAppDirectory = "ReefDriverAppDlls"; + + public const string BridgeJarFileName = "reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar"; + + public const string BridgeLaunchClass = "org.apache.reef.javabridge.generic.Launch"; + + public const string BridgeLaunchHeadlessClass = "org.apache.reef.javabridge.generic.LaunchHeadless"; + + public const string DirectLauncherClass = "org.apache.reef.runtime.common.Launcher"; + + public const string JavaToCLRLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.CLRLoggingConfig"; + + public const string JavaVerboseLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config"; + + public static Dictionary<string, int> Handlers + { + get + { + return + new Dictionary<string, int>() + { + { EvaluatorRequestorHandler, 0 }, + { AllocatedEvaluatorHandler, 1 }, + { ActiveContextHandler, 2 }, + { TaskMessageHandler, 3 }, + { FailedTaskHandler, 4 }, + { FailedEvaluatorHandler, 5 }, + { HttpServerHandler, 6 }, + { CompletedTaskHandler, 7 }, + { RunningTaskHandler, 8 }, + { SuspendedTaskHandler, 9 }, + { CompletedEvaluatorHandler, 10 }, + { ClosedContextHandler, 11 }, + { FailedContextHandler, 12 }, + { ContextMessageHandler, 13 }, + { DriverRestartHandler, 14 }, + { DriverRestartActiveContextHandler, 15 }, + { DriverRestartRunningTaskHandler, 16 }, + }; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverConfigGenerator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfigGenerator.cs b/lang/cs/Org.Apache.REEF.Driver/DriverConfigGenerator.cs new file mode 100644 index 0000000..08cccc0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfigGenerator.cs @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Org.Apache.REEF.Driver.bridge; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Protobuf; + +namespace Org.Apache.REEF.Driver +{ + public class DriverConfigGenerator + { + public const string DriverConfigFile = "driver.config"; + public const string JobDriverConfigFile = "jobDriver.config"; + public const string DriverChFile = "driverClassHierarchy.bin"; + public const string HttpServerConfigFile = "httpServer.config"; + public const string NameServerConfigFile = "nameServer.config"; + public const string UserSuppliedGlobalLibraries = "userSuppliedGlobalLibraries.txt"; + + private static readonly Logger Log = Logger.GetLogger(typeof(DriverConfigGenerator)); + + public static void DriverConfigurationBuilder(DriverConfigurationSettings driverConfigurationSettings) + { + ExtractConfigFromJar(driverConfigurationSettings.JarFileFolder); + + if (!File.Exists(DriverChFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", DriverChFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(HttpServerConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", HttpServerConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(JobDriverConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", JobDriverConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(NameServerConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", NameServerConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + + IClassHierarchy drvierClassHierarchy = ProtocolBufferClassHierarchy.DeSerialize(DriverChFile); + + AvroConfiguration jobDriverAvroconfiguration = serializer.AvroDeseriaizeFromFile(JobDriverConfigFile); + IConfiguration jobDriverConfiguration = serializer.FromAvro(jobDriverAvroconfiguration, drvierClassHierarchy); + + AvroConfiguration httpAvroconfiguration = serializer.AvroDeseriaizeFromFile(HttpServerConfigFile); + IConfiguration httpConfiguration = serializer.FromAvro(httpAvroconfiguration, drvierClassHierarchy); + + AvroConfiguration nameAvroconfiguration = serializer.AvroDeseriaizeFromFile(NameServerConfigFile); + IConfiguration nameConfiguration = serializer.FromAvro(nameAvroconfiguration, drvierClassHierarchy); + + IConfiguration merged; + + if (driverConfigurationSettings.IncludingHttpServer && driverConfigurationSettings.IncludingNameServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration, nameConfiguration); + } + else if (driverConfigurationSettings.IncludingHttpServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration); + } + else if (driverConfigurationSettings.IncludingNameServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, nameConfiguration); + } + else + { + merged = jobDriverConfiguration; + } + + var b = merged.newBuilder(); + + b.BindSetEntry("org.apache.reef.driver.parameters.DriverIdentifier", driverConfigurationSettings.DriverIdentifier); + b.Bind("org.apache.reef.driver.parameters.DriverMemory", driverConfigurationSettings.DriverMemory.ToString(CultureInfo.CurrentCulture)); + b.Bind("org.apache.reef.driver.parameters.DriverJobSubmissionDirectory", driverConfigurationSettings.SubmissionDirectory); + + //add for all the globallibaries + if (File.Exists(UserSuppliedGlobalLibraries)) + { + var globalLibString = File.ReadAllText(UserSuppliedGlobalLibraries); + if (!string.IsNullOrEmpty(globalLibString)) + { + foreach (string fname in globalLibString.Split(',')) + { + b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalLibraries", fname); + } + } + } + + foreach (string f in Directory.GetFiles(driverConfigurationSettings.ClrFolder)) + { + b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalFiles", f); + } + + IConfiguration c = b.Build(); + + serializer.ToFile(c, DriverConfigFile); + + Log.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "driver.config is written to: {0} {1}.", Directory.GetCurrentDirectory(), DriverConfigFile)); + + //additional file for easy to read + using (StreamWriter outfile = new StreamWriter(DriverConfigFile + ".txt")) + { + outfile.Write(serializer.ToString(c)); + } + } + + private static void ExtractConfigFromJar(string jarfileFolder) + { + string jarfile = jarfileFolder + Constants.BridgeJarFileName; + List<string> files = new List<string>(); + files.Add(DriverConfigGenerator.HttpServerConfigFile); + files.Add(DriverConfigGenerator.JobDriverConfigFile); + files.Add(DriverConfigGenerator.NameServerConfigFile); + files.Add(DriverConfigGenerator.DriverChFile); + ClrClientHelper.ExtractConfigfileFromJar(jarfile, files, "."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverConfigurationSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfigurationSettings.cs b/lang/cs/Org.Apache.REEF.Driver/DriverConfigurationSettings.cs new file mode 100644 index 0000000..98ec5ce --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfigurationSettings.cs @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver +{ + public class DriverConfigurationSettings + { + // default to "ReefDevClrBridge" + private string _driverIdentifier = "ReefDevClrBridge"; + + // default to _defaultSubmissionDirectory if not provided + private string _submissionDirectory = "reefTmp/job_" + DateTime.Now.Millisecond; + + // deault to 512MB if no value is provided + private int _driverMemory = 512; + + // folder path that constains clr dlls used by reef + private string _clrFolder = "."; + + // folder that contains jar File provided Byte REEF + private string _jarFileFolder = "."; + + // default to true if no value is specified + private bool _includeHttpServer = true; + + // default to true if no value is specified + private bool _includeNameServer = true; + + /// <summary> + /// Memory allocated for driver, default to 512 MB + /// </summary> + public int DriverMemory + { + get + { + return _driverMemory; + } + + set + { + if (value < 0) + { + throw new ArgumentException("driver memory cannot be negatvie value."); + } + _driverMemory = value; + } + } + + /// <summary> + /// Gets or sets a value indicating whether including name server in the config file. + /// </summary> + /// <value> + /// <c>true</c> if [including name server]; otherwise, <c>false</c>. + /// </value> + public bool IncludingNameServer + { + get { return _includeNameServer; } + set { _includeNameServer = value; } + } + + /// <summary> + /// Gets or sets a value indicating whether including HTTP server in the config file. + /// </summary> + /// <value> + /// <c>true</c> if [including HTTP server]; otherwise, <c>false</c>. + /// </value> + public bool IncludingHttpServer + { + get { return _includeHttpServer; } + set { _includeHttpServer = value; } + } + + /// <summary> + /// Driver Identifier, default to "ReefDevClrBridge" + /// </summary> + public string DriverIdentifier + { + get { return _driverIdentifier; } + set { _driverIdentifier = value; } + } + + /// <summary> + /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name + /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs + /// </summary> + public string SubmissionDirectory + { + get { return _submissionDirectory; } + set { _submissionDirectory = value; } + } + + /// <summary> + /// Gets or sets the CLR folder. + /// </summary> + /// <value> + /// The CLR folder. + /// </value> + public string ClrFolder + { + get { return this._clrFolder; } + set { _clrFolder = value; } + } + + /// <summary> + /// Gets or sets the jar file folder. + /// </summary> + /// <value> + /// The jar file folder. + /// </value> + public string JarFileFolder + { + get { return this._jarFileFolder; } + set { _jarFileFolder = value; } + } + } +} \ No newline at end of file
