http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs new file mode 100644 index 0000000..3cd3e15 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.ProtoBuf.ClienRuntimeProto; +using System; + +// TODO +namespace Org.Apache.Reef.Driver +{ + public class ClientManager : IObserver<JobControlProto> + { + public void OnNext(JobControlProto value) + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs new file mode 100644 index 0000000..4e8e68c --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; + +namespace Org.Apache.Reef.Driver +{ + public class Constants + { + public const ulong NullHandler = 0; + + public const string ClassHierarachyBin = "clrClassHierarchy.bin"; + + public const string GlobalUserSuppliedJavaLibraries = "userSuppliedGlobalLibraries.txt"; + + public const int DefaultMemoryGranularity = 1024; + + public const int HandlersNumber = 17; + + public const string EvaluatorRequestorHandler = "EvaluatorRequestor"; + + public const string AllocatedEvaluatorHandler = "AllocatedEvaluator"; + + public const string CompletedEvaluatorHandler = "CompletedEvaluator"; + + public const string ActiveContextHandler = "ActiveContext"; + + public const string ClosedContextHandler = "ClosedContext"; + + public const string FailedContextHandler = "FailedContext"; + + public const string ContextMessageHandler = "ContextMessage"; + + public const string TaskMessageHandler = "TaskMessage"; + + public const string FailedTaskHandler = "FailedTask"; + + public const string RunningTaskHandler = "RunningTask"; + + public const string FailedEvaluatorHandler = "FailedEvaluator"; + + public const string CompletedTaskHandler = "CompletedTask"; + + public const string SuspendedTaskHandler = "SuspendedTask"; + + public const string HttpServerHandler = "HttpServerHandler"; + + public const string DriverRestartHandler = "DriverRestart"; + + public const string DriverRestartActiveContextHandler = "DriverRestartActiveContext"; + + public const string DriverRestartRunningTaskHandler = "DriverRestartRunningTask"; + + public const string DriverBridgeConfiguration = Common.Constants.ClrBridgeRuntimeConfiguration; + + public const string DriverAppDirectory = "ReefDriverAppDlls"; + + public const string BridgeJarFileName = "reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar"; + + public const string BridgeLaunchClass = "org.apache.reef.javabridge.generic.Launch"; + + public const string BridgeLaunchHeadlessClass = "org.apache.reef.javabridge.generic.LaunchHeadless"; + + public const string DirectLauncherClass = "org.apache.reef.runtime.common.Launcher"; + + public const string JavaToCLRLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.CLRLoggingConfig"; + + public const string JavaVerboseLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config"; + + public static Dictionary<string, int> Handlers + { + get + { + return + new Dictionary<string, int>() + { + { EvaluatorRequestorHandler, 0 }, + { AllocatedEvaluatorHandler, 1 }, + { ActiveContextHandler, 2 }, + { TaskMessageHandler, 3 }, + { FailedTaskHandler, 4 }, + { FailedEvaluatorHandler, 5 }, + { HttpServerHandler, 6 }, + { CompletedTaskHandler, 7 }, + { RunningTaskHandler, 8 }, + { SuspendedTaskHandler, 9 }, + { CompletedEvaluatorHandler, 10 }, + { ClosedContextHandler, 11 }, + { FailedContextHandler, 12 }, + { ContextMessageHandler, 13 }, + { DriverRestartHandler, 14 }, + { DriverRestartActiveContextHandler, 15 }, + { DriverRestartRunningTaskHandler, 16 }, + }; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs new file mode 100644 index 0000000..46c56c5 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Org.Apache.Reef.Driver.bridge; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Implementations.Configuration; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Protobuf; + +namespace Org.Apache.Reef.Driver +{ + public class DriverConfigGenerator + { + public const string DriverConfigFile = "driver.config"; + public const string JobDriverConfigFile = "jobDriver.config"; + public const string DriverChFile = "driverClassHierarchy.bin"; + public const string HttpServerConfigFile = "httpServer.config"; + public const string NameServerConfigFile = "nameServer.config"; + public const string UserSuppliedGlobalLibraries = "userSuppliedGlobalLibraries.txt"; + + private static readonly Logger Log = Logger.GetLogger(typeof(DriverConfigGenerator)); + + public static void DriverConfigurationBuilder(DriverConfigurationSettings driverConfigurationSettings) + { + ExtractConfigFromJar(driverConfigurationSettings.JarFileFolder); + + if (!File.Exists(DriverChFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", DriverChFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(HttpServerConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", HttpServerConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(JobDriverConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", JobDriverConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + if (!File.Exists(NameServerConfigFile)) + { + Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", NameServerConfigFile, driverConfigurationSettings.JarFileFolder)); + return; + } + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + + IClassHierarchy drvierClassHierarchy = ProtocolBufferClassHierarchy.DeSerialize(DriverChFile); + + AvroConfiguration jobDriverAvroconfiguration = serializer.AvroDeseriaizeFromFile(JobDriverConfigFile); + IConfiguration jobDriverConfiguration = serializer.FromAvro(jobDriverAvroconfiguration, drvierClassHierarchy); + + AvroConfiguration httpAvroconfiguration = serializer.AvroDeseriaizeFromFile(HttpServerConfigFile); + IConfiguration httpConfiguration = serializer.FromAvro(httpAvroconfiguration, drvierClassHierarchy); + + AvroConfiguration nameAvroconfiguration = serializer.AvroDeseriaizeFromFile(NameServerConfigFile); + IConfiguration nameConfiguration = serializer.FromAvro(nameAvroconfiguration, drvierClassHierarchy); + + IConfiguration merged; + + if (driverConfigurationSettings.IncludingHttpServer && driverConfigurationSettings.IncludingNameServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration, nameConfiguration); + } + else if (driverConfigurationSettings.IncludingHttpServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration); + } + else if (driverConfigurationSettings.IncludingNameServer) + { + merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, nameConfiguration); + } + else + { + merged = jobDriverConfiguration; + } + + var b = merged.newBuilder(); + + b.BindSetEntry("org.apache.reef.driver.parameters.DriverIdentifier", driverConfigurationSettings.DriverIdentifier); + b.Bind("org.apache.reef.driver.parameters.DriverMemory", driverConfigurationSettings.DriverMemory.ToString(CultureInfo.CurrentCulture)); + b.Bind("org.apache.reef.driver.parameters.DriverJobSubmissionDirectory", driverConfigurationSettings.SubmissionDirectory); + + //add for all the globallibaries + if (File.Exists(UserSuppliedGlobalLibraries)) + { + var globalLibString = File.ReadAllText(UserSuppliedGlobalLibraries); + if (!string.IsNullOrEmpty(globalLibString)) + { + foreach (string fname in globalLibString.Split(',')) + { + b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalLibraries", fname); + } + } + } + + foreach (string f in Directory.GetFiles(driverConfigurationSettings.ClrFolder)) + { + b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalFiles", f); + } + + IConfiguration c = b.Build(); + + serializer.ToFile(c, DriverConfigFile); + + Log.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "driver.config is written to: {0} {1}.", Directory.GetCurrentDirectory(), DriverConfigFile)); + + //additional file for easy to read + using (StreamWriter outfile = new StreamWriter(DriverConfigFile + ".txt")) + { + outfile.Write(serializer.ToString(c)); + } + } + + private static void ExtractConfigFromJar(string jarfileFolder) + { + string jarfile = jarfileFolder + Constants.BridgeJarFileName; + List<string> files = new List<string>(); + files.Add(DriverConfigGenerator.HttpServerConfigFile); + files.Add(DriverConfigGenerator.JobDriverConfigFile); + files.Add(DriverConfigGenerator.NameServerConfigFile); + files.Add(DriverConfigGenerator.DriverChFile); + ClrClientHelper.ExtractConfigfileFromJar(jarfile, files, "."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs new file mode 100644 index 0000000..463e983 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.Reef.Utilities.Logging; + +namespace Org.Apache.Reef.Driver +{ + public class DriverConfigurationSettings + { + // default to "ReefDevClrBridge" + private string _driverIdentifier = "ReefDevClrBridge"; + + // default to _defaultSubmissionDirectory if not provided + private string _submissionDirectory = "reefTmp/job_" + DateTime.Now.Millisecond; + + // deault to 512MB if no value is provided + private int _driverMemory = 512; + + // folder path that constains clr dlls used by reef + private string _clrFolder = "."; + + // folder that contains jar File provided Byte REEF + private string _jarFileFolder = "."; + + // default to true if no value is specified + private bool _includeHttpServer = true; + + // default to true if no value is specified + private bool _includeNameServer = true; + + /// <summary> + /// Memory allocated for driver, default to 512 MB + /// </summary> + public int DriverMemory + { + get + { + return _driverMemory; + } + + set + { + if (value < 0) + { + throw new ArgumentException("driver memory cannot be negatvie value."); + } + _driverMemory = value; + } + } + + /// <summary> + /// Gets or sets a value indicating whether including name server in the config file. + /// </summary> + /// <value> + /// <c>true</c> if [including name server]; otherwise, <c>false</c>. + /// </value> + public bool IncludingNameServer + { + get { return _includeNameServer; } + set { _includeNameServer = value; } + } + + /// <summary> + /// Gets or sets a value indicating whether including HTTP server in the config file. + /// </summary> + /// <value> + /// <c>true</c> if [including HTTP server]; otherwise, <c>false</c>. + /// </value> + public bool IncludingHttpServer + { + get { return _includeHttpServer; } + set { _includeHttpServer = value; } + } + + /// <summary> + /// Driver Identifier, default to "ReefDevClrBridge" + /// </summary> + public string DriverIdentifier + { + get { return _driverIdentifier; } + set { _driverIdentifier = value; } + } + + /// <summary> + /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name + /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs + /// </summary> + public string SubmissionDirectory + { + get { return _submissionDirectory; } + set { _submissionDirectory = value; } + } + + /// <summary> + /// Gets or sets the CLR folder. + /// </summary> + /// <value> + /// The CLR folder. + /// </value> + public string ClrFolder + { + get { return this._clrFolder; } + set { _clrFolder = value; } + } + + /// <summary> + /// Gets or sets the jar file folder. + /// </summary> + /// <value> + /// The jar file folder. + /// </value> + public string JarFileFolder + { + get { return this._jarFileFolder; } + set { _jarFileFolder = value; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs new file mode 100644 index 0000000..b0efc2a --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common; +using Org.Apache.Reef.Common.Api; +using Org.Apache.Reef.Common.Catalog; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Common.Exceptions; +using Org.Apache.Reef.Common.ProtoBuf.DriverRuntimeProto; +using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Driver.Bridge; +using Org.Apache.Reef.Driver.Evaluator; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Implementations; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Wake.Remote; +using Org.Apache.Reef.Wake.Time; +using Org.Apache.Reef.Wake.Time.Runtime.Event; +using System; +using System.Collections.Generic; +using System.Globalization; + +namespace Org.Apache.Reef.Driver +{ + public class DriverManager : + IEvaluatorRequestor, + IObserver<RuntimeStatusProto>, + IObserver<ResourceStatusProto>, + IObserver<ResourceAllocationProto>, + IObserver<NodeDescriptorProto>, + IObserver<RuntimeStart>, + IObserver<RuntimeStop>, + IObserver<IdleClock> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager)); + + private IInjector _injector; + + private IInjectionFuture<IClock> _clockFuture; + + private ResourceCatalogImpl _resourceCatalog; + + private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler; + + private Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>(); + + private EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker(); + + private ClientJobStatusHandler _clientJobStatusHandler; + + private IDisposable _heartbeatConnectionChannel; + + private IDisposable _errorChannel; + + private IObserver<RuntimeErrorProto> _runtimeErrorHandler; + + public DriverManager( + IInjector injector, + ResourceCatalogImpl resourceCatalog, + IRemoteManager<REEFMessage> remoteManager, + IInjectionFuture<IClock> clockFuture, + IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler, + ClientJobStatusHandler clientJobStatusHandler, + string clientRId) + { + _injector = injector; + _clockFuture = clockFuture; + _resourceCatalog = resourceCatalog; + _futureResourceRequestHandler = futureResourceRequestHandler; + _clientJobStatusHandler = clientJobStatusHandler; + + _heartbeatConnectionChannel = null; + _errorChannel = null; + _runtimeErrorHandler = null; + LOGGER.Log(Level.Info, "DriverManager instantiated"); + } + + public IResourceCatalog ResourceCatalog + { + get + { + return _resourceCatalog; + } + + set + { + } + } + + private RuntimeStatusProto _runtimeStatusProto + { + get + { + RuntimeStatusProto proto = new RuntimeStatusProto(); + proto.state = State.INIT; + proto.name = "REEF"; + proto.outstanding_container_requests = 0; + return proto; + } + + set + { + _runtimeStatusProto = value; + } + } + + public void Submit(IEvaluatorRequest request) + { + LOGGER.Log(Level.Info, "Got an EvaluatorRequest"); + ResourceRequestProto proto = new ResourceRequestProto(); + //TODO: request.size deprecated should use megabytes instead + //switch (request.Size) + //{ + // case EvaluatorRequest.EvaluatorSize.SMALL: + // proto.resource_size = SIZE.SMALL; + // break; + // case EvaluatorRequest.EvaluatorSize.MEDIUM: + // proto.resource_size = SIZE.MEDIUM; + // break; + // case EvaluatorRequest.EvaluatorSize.LARGE: + // proto.resource_size = SIZE.LARGE; + // break; + // case EvaluatorRequest.EvaluatorSize.XLARGE: + // proto.resource_size = SIZE.XLARGE; + // break; + // default: + // throw new InvalidOperationException("invalid request size" + request.Size); + //} + proto.resource_count = request.Number; + if (request.MemoryMegaBytes > 0) + { + proto.memory_size = request.MemoryMegaBytes; + } + + //final ResourceCatalog.Descriptor descriptor = req.getDescriptor(); + //if (descriptor != null) { + // if (descriptor instanceof RackDescriptor) { + // request.addRackName(descriptor.getName()); + // } else if (descriptor instanceof NodeDescriptor) { + // request.addNodeName(descriptor.getName()); + // } + //} + + //_futureResourceRequestHandler.Get().OnNext(proto); + } + + public void Release(EvaluatorManager evaluatorManager) + { + lock (this) + { + string evaluatorManagerId = evaluatorManager.Id; + if (_evaluators.ContainsKey(evaluatorManagerId)) + { + _evaluators.Remove(evaluatorManagerId); + } + else + { + var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// This handles runtime error occurs on the evaluator + /// </summary> + /// <param name="runtimeErrorProto"></param> + public void Handle(RuntimeErrorProto runtimeErrorProto) + { + FailedRuntime error = new FailedRuntime(runtimeErrorProto); + LOGGER.Log(Level.Warning, "Runtime error:" + error); + + EvaluatorException evaluatorException = error.Cause != null + ? new EvaluatorException(error.Id, error.Cause.Value) + : new EvaluatorException(error.Id, "Runtime error"); + EvaluatorManager evaluatorManager = null; + lock (_evaluators) + { + if (_evaluators.ContainsKey(error.Id)) + { + evaluatorManager = _evaluators[error.Id]; + } + else + { + LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause); + } + } + if (null != evaluatorManager) + { + evaluatorManager.Handle(evaluatorException); + } + } + + /// <summary> + /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status + /// </summary> + /// <param name="runtimeStatusProto"></param> + public void OnNext(RuntimeStatusProto runtimeStatusProto) + { + Handle(runtimeStatusProto); + } + + /// <summary> + /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks + /// about the current state of a given resource. Ideally, we should think the same thing. + /// </summary> + /// <param name="resourceStatusProto"></param> + public void OnNext(ResourceStatusProto resourceStatusProto) + { + Handle(resourceStatusProto); + } + + /// <summary> + /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer. + /// </summary> + /// <param name="resourceAllocationProto"></param> + public void OnNext(ResourceAllocationProto resourceAllocationProto) + { + Handle(resourceAllocationProto); + } + + /// <summary> + /// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog + /// so that clients can make resource requests against it. + /// </summary> + /// <param name="nodeDescriptorProto"></param> + public void OnNext(NodeDescriptorProto nodeDescriptorProto) + { + _resourceCatalog.Handle(nodeDescriptorProto); + } + + /// <summary> + /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance + /// point to REEF. + /// </summary> + /// <param name="runtimeStart"></param> + public void OnNext(RuntimeStart runtimeStart) + { + LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart); + _runtimeStatusProto = new RuntimeStatusProto(); + _runtimeStatusProto.state = State.RUNNING; + _runtimeStatusProto.name = "REEF"; + _runtimeStatusProto.outstanding_container_requests = 0; + } + + /// <summary> + /// Handles RuntimeStop + /// </summary> + /// <param name="runtimeStop"></param> + public void OnNext(RuntimeStop runtimeStop) + { + LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop); + if (runtimeStop.Exception != null) + { + string exceptionMessage = runtimeStop.Exception.Message; + LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage); + RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto(); + runtimeErrorProto.message = exceptionMessage; + runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage); + runtimeErrorProto.name = "REEF"; + _runtimeErrorHandler.OnNext(runtimeErrorProto); + + LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage); + } + + lock (_evaluators) + { + foreach (EvaluatorManager evaluatorManager in _evaluators.Values) + { + LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id); + evaluatorManager.Dispose(); + } + } + + try + { + _heartbeatConnectionChannel.Dispose(); + _errorChannel.Dispose(); + Optional<Exception> e = runtimeStop.Exception != null ? + Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty(); + _clientJobStatusHandler.Dispose(e); + + LOGGER.Log(Level.Info, "driver manager closed"); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER); + Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER); + } + } + + public void OnNext(IdleClock value) + { + string message = string.Format( + CultureInfo.InvariantCulture, + "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]", + value + Environment.NewLine, + _runtimeStatusProto.state + Environment.NewLine, + _runtimeStatusProto.outstanding_container_requests + Environment.NewLine, + _runtimeStatusProto.container_allocation.Count); + LOGGER.Log(Level.Info, message); + + lock (_evaluators) + { + if (_runtimeStatusProto.state == State.RUNNING + && _runtimeStatusProto.outstanding_container_requests == 0 + && _runtimeStatusProto.container_allocation.Count == 0) + { + LOGGER.Log(Level.Info, "Idle runtime shutdown"); + _clockFuture.Get().Dispose(); + } + } + } + + void IObserver<IdleClock>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<IdleClock>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<NodeDescriptorProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<NodeDescriptorProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ResourceAllocationProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ResourceAllocationProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ResourceStatusProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ResourceStatusProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStatusProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStatusProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + /// <summary> + /// Something went wrong at the runtime layer (either driver or evaluator). This + /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler. + /// </summary> + /// <param name="runtimeErrorProto"></param> + private void Fail(RuntimeErrorProto runtimeErrorProto) + { + _runtimeErrorHandler.OnNext(runtimeErrorProto); + _clockFuture.Get().Dispose(); + } + + /// <summary> + /// Helper method to create a new EvaluatorManager instance + /// </summary> + /// <param name="id">identifier of the Evaluator</param> + /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param> + /// <returns>new EvaluatorManager instance.</returns> + private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor) + { + LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id); + //TODO bindVolatieParameter + return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager)); + } + + /// <summary> + /// Receives and routes heartbeats from Evaluators. + /// </summary> + /// <param name="evaluatorHearBeatProto"></param> + private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto) + { + EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message; + EvaluatorStatusProto status = heartbeat.evaluator_status; + string evaluatorId = status.evaluator_id; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp)); + _sanityChecker.check(evaluatorId, heartbeat.timestamp); + + lock (_evaluators) + { + if (_evaluators.ContainsKey(evaluatorId)) + { + EvaluatorManager evaluatorManager = _evaluators[evaluatorId]; + evaluatorManager.Handle(evaluatorHearBeatProto); + } + else + { + string msg = "Contact from unkonwn evaluator with id: " + evaluatorId; + if (heartbeat.evaluator_status != null) + { + msg += " with state" + status.state; + } + LOGGER.Log(Level.Error, msg); + Exceptions.Throw(new InvalidOperationException(msg), LOGGER); + } + } + } + + /// <summary> + /// This resource status message comes from the ResourceManager layer; telling me what it thinks + /// about the state of the resource executing an Evaluator; This method simply passes the message + /// off to the referenced EvaluatorManager + /// </summary> + /// <param name="resourceStatusProto"></param> + private void Handle(ResourceStatusProto resourceStatusProto) + { + lock (_evaluators) + { + if (_evaluators.ContainsKey(resourceStatusProto.identifier)) + { + EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier]; + evaluatorManager.Handle(resourceStatusProto); + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// This method handles resource allocations by creating a new EvaluatorManager instance. + /// </summary> + /// <param name="resourceAllocationProto"></param> + private void Handle(ResourceAllocationProto resourceAllocationProto) + { + lock (_evaluators) + { + try + { + INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id); + if (nodeDescriptor == null) + { + Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER); + } + EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores); + LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier); + EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor); + _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Error, LOGGER); + Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER); + } + } + } + + private void Handle(RuntimeStatusProto runtimeStatusProto) + { + State runtimeState = runtimeStatusProto.state; + LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state); + + switch (runtimeState) + { + case State.FAILED: + Fail(runtimeStatusProto.error); + break; + case State.DONE: + _clockFuture.Get().Dispose(); + break; + case State.RUNNING: + lock (_evaluators) + { + _runtimeStatusProto = runtimeStatusProto; + if (_clockFuture.Get().IsIdle() + && runtimeStatusProto.outstanding_container_requests == 0 + && runtimeStatusProto.container_allocation.Count == 0) + { + _clockFuture.Get().Dispose(); + } + } + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs new file mode 100644 index 0000000..a3dff0e --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common; +using Org.Apache.Reef.Common.Api; +using Org.Apache.Reef.Common.Catalog; +using Org.Apache.Reef.Common.Client; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Driver.Evaluator; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Util; + +namespace Org.Apache.Reef.Driver +{ + public class DriverRuntimeConfiguration : ConfigurationModuleBuilder + { + public static ConfigurationModule ConfigurationModule + { + get + { + return new DriverRuntimeConfiguration() + // Resource Catalog + .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class) + + // JobMessageObserver + //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class) + .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class) + + // JobMessageObserver Wake event handler bindings + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class) + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class) + + // Client manager + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class) + + // Bind the runtime parameters + //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class) + + // Bind to the Clock + //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class) + .Build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs new file mode 100644 index 0000000..5de7856 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common; +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Driver +{ + public class DriverRuntimeConfigurationOptions + { + [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")] + public class JobMessageHandler : Name<ClientJobStatusHandler> + { + } + + [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")] + public class JobExceptionHandler : Name<ClientJobStatusHandler> + { + } + + [NamedParameter(documentation: "Called when a job control message is received by the client.")] + public class JobControlHandler : Name<ClientManager> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs new file mode 100644 index 0000000..3bdaf6b --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.Reef.Utilities.Logging; + +namespace Org.Apache.Reef.Driver +{ + // TODO: merge with EvaluatorConfigurations class + public class DriverSubmissionSettings + { + // default to "ReefDevClrBridge" + private string _driverIdentifier; + + // default to _defaultSubmissionDirectory is not provided + private string _submissionDirectory; + + // deault to 512MB if no value is provided + private int _driverMemory = 0; + + // default value, client wait till driver exit + private int _clientWaitTime = -1; + + // default to submit to driver with driver config + private bool _submit = true; + + // default to always update jar before submission + private bool _updateJar = true; + + // default to run on local + private bool _runOnYarn; + + // default to set to info logging + private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.INFO; + + /// <summary> + /// Whether to update jar file with needed dlls before submission + /// User can choose to reduce startup time by skipping the update, if jar file already contains all necessary dll + /// Note this settig is .NET only, it does not propagate to java side. + /// </summary> + public bool UpdateJarBeforeSubmission + { + get { return _updateJar; } + set { _updateJar = value; } + } + + /// <summary> + /// Determine the vebosity of Java driver's log. + /// Note this parameter is used when launching java process only, it does not propagate to java side. + /// </summary> + public JavaLoggingSetting JavaLogLevel + { + get { return _javaLogLevel; } + set { _javaLogLevel = value; } + } + + /// <summary> + /// Memory allocated for driver, default to 512 MB + /// </summary> + public int DriverMemory + { + get + { + return _driverMemory; + } + + set + { + if (value < 0) + { + throw new ArgumentException("driver memory cannot be negatvie value."); + } + _driverMemory = value; + } + } + + /// <summary> + /// Driver Identifier, default to "ReefDevClrBridge" + /// </summary> + public string DriverIdentifier + { + get + { + return _driverIdentifier; + } + + set + { + _driverIdentifier = value; + } + } + + /// <summary> + /// Whether to submit driver with config after driver configuration is construted, default to True + /// </summary> + public bool Submit + { + get + { + return _submit; + } + + set + { + _submit = value; + } + } + + /// <summary> + /// How long client would wait for Driver, default to wait till driver is done + /// </summary> + public int ClientWaitTime + { + get + { + return _clientWaitTime; + } + + set + { + _clientWaitTime = value; + } + } + + /// <summary> + /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name + /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs + /// </summary> + public string SubmissionDirectory + { + get + { + return _submissionDirectory; + } + + set + { + _submissionDirectory = value; + } + } + + /// <summary> + /// Whether to Run on YARN runtime, default to false + /// </summary> + public bool RunOnYarn + { + get + { + return _runOnYarn; + } + + set + { + _runOnYarn = value; + } + } + + public string ToComamndLineArguments() + { + return + (RunOnYarn ? " -local false" : string.Empty) + + (!Submit ? " -submit false" : string.Empty) + + (DriverMemory > 0 ? " -driver_memory " + DriverMemory : string.Empty) + + (!string.IsNullOrWhiteSpace(DriverIdentifier) ? " -drive_id " + DriverIdentifier : string.Empty) + + (ClientWaitTime > 0 ? " -wait_time " + ClientWaitTime : string.Empty) + + (!string.IsNullOrWhiteSpace(SubmissionDirectory) ? " -submission_directory " + SubmissionDirectory : string.Empty); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs new file mode 100644 index 0000000..5dc1ce7 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs @@ -0,0 +1,655 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Api; +using Org.Apache.Reef.Common.Catalog; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Common.Exceptions; +using Org.Apache.Reef.Common.ProtoBuf.DriverRuntimeProto; +using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Driver.Bridge; +using Org.Apache.Reef.Driver.Context; +using Org.Apache.Reef.Driver.Evaluator; +using Org.Apache.Reef.Driver.Task; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Wake.Remote; +using Org.Apache.Reef.Wake.Time; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; + +using TaskMessage = Org.Apache.Reef.Tasks.TaskMessage; + +namespace Org.Apache.Reef.Driver +{ + /// <summary> + /// Manages a single Evaluator instance including all lifecycle instances: + /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). + /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. + /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. + /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. + /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate + /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances: + /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). + /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. + /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. + /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. + /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend). + /// </summary> + public class EvaluatorManager : IDisposable, IIdentifiable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager)); + + private STATE _state = STATE.ALLOCATED; + + private IClock _clock; + + // TODO + // private final RemoteManager remoteManager; + private DriverManager _driverManager; + + private IResourceReleaseHandler _resourceReleaseHandler; + + private IResourceLaunchHandler _resourceLaunchHandler; + + private EvaluatorDescriptorImpl _evaluatorDescriptor; + + private string _evaluatorId; + + private IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>(); + + private HashSet<string> _activeContextIds = new HashSet<string>(); + + private IRunningTask _runningTask = null; + + private IObserver<EvaluatorControlProto> _evaluatorControlHandler = null; + + private bool _isResourceReleased = false; + + //TODO + //private final DispatchingEStage dispatcher; + private EvaluatorType _type = EvaluatorType.CLR; + + public EvaluatorManager( + IClock clock, + //RemoteManager remoteManager, + DriverManager driverManager, + IResourceReleaseHandler resourceReleaseHandler, + IResourceLaunchHandler resourceLaunchHandler, + //REEFErrorHandler errorHandler, + string evaluatorId, + EvaluatorDescriptorImpl evaluatorDescriptor, + ISet<IObservable<IActiveContext>> activeContextEventHandler, + ISet<IObservable<IClosedContext>> closedContextEventHandlers, + ISet<IObservable<FailedContext>> failedContextEventHandlers, + ISet<IObservable<ContextMessage>> contextMessageHandlers, + ISet<IObservable<IRunningTask>> runningTaskEventHandlers, + ISet<IObservable<ICompletedTask>> completedTaskEventHandlers, + ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers, + ISet<IObservable<TaskMessage>> taskMessageEventHandlers, + ISet<IObservable<FailedTask>> taskExceptionEventHandlers, + ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers, + ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers, + ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers) + { + _clock = clock; + //_remoteManager = remoteManager; + _driverManager = driverManager; + _resourceReleaseHandler = resourceReleaseHandler; + _resourceLaunchHandler = resourceLaunchHandler; + _evaluatorId = evaluatorId; + _evaluatorDescriptor = evaluatorDescriptor; + + //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads + + //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers); + //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers); + //this.dispatcher.register(FailedContext.class, failedContextEventHandlers); + //this.dispatcher.register(ContextMessage.class, contextMessageHandlers); + + //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers); + //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers); + //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers); + //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers); + //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers); + + //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers); + //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers); + //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers); + + //this.dispatcher.onNext(AllocatedEvaluator.class, + // new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier())); + } + + /// <summary> + /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager. + /// </summary> + public enum STATE + { + ALLOCATED, // initial state + SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact + RUNNING, // first contact received, all communication channels established, Evaluator sent to client. + DONE, // clean shutdown + FAILED, // some failure occurred. + KILLED // unclean shutdown + } + + public IEvaluatorDescriptor EvaluatorDescriptor + { + get + { + return _evaluatorDescriptor; + } + } + + public INodeDescriptor NodeDescriptor + { + get + { + return EvaluatorDescriptor.NodeDescriptor; + } + } + + public IRunningTask RunningTask + { + get + { + lock (_evaluatorDescriptor) + { + return _runningTask; + } + } + } + + public string Id + { + get + { + return _evaluatorId; + } + + set + { + } + } + + public EvaluatorType Type + { + get + { + return _type; + } + + set + { + _type = value; + _evaluatorDescriptor.EvaluatorType = value; + } + } + + public void Dispose() + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.RUNNING) + { + LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id); + try + { + // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. + EvaluatorControlProto proto = new EvaluatorControlProto(); + proto.timestamp = DateTime.Now.Ticks; + proto.identifier = Id; + proto.kill_evaluator = new KillEvaluatorProto(); + Handle(proto); + } + finally + { + _state = STATE.KILLED; + } + } + } + + if (!_isResourceReleased) + { + try + { + // We need to wait awhile before returning the container to the RM in order to + // give the EvaluatorRuntime (and Launcher) time to cleanly exit. + + // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { + //@Override + //public void onNext(final Alarm alarm) { + // EvaluatorManager.this.resourceReleaseHandler.onNext( + // DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() + // .setIdentifier(EvaluatorManager.this.evaluatorId).build()); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER); + ResourceReleaseProto proto = new ResourceReleaseProto(); + proto.identifier = _evaluatorId; + _resourceReleaseHandler.OnNext(proto); + } + finally + { + _isResourceReleased = true; + _driverManager.Release(this); + } + } + } + + /// <summary> + /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED + /// </summary> + /// <param name="exception"></param> + public void Handle(EvaluatorException exception) + { + lock (_evaluatorDescriptor) + { + if (_state >= STATE.DONE) + { + return; + } + LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message); + try + { + IList<FailedContext> failedContexts = new List<FailedContext>(); + IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts); + activeContexts = activeContexts.Reverse().ToList(); + foreach (EvaluatorContext context in activeContexts) + { + Optional<IActiveContext> parentContext = context.ParentId.IsPresent() + ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value)) + : Optional<IActiveContext>.Empty(); + failedContexts.Add(context.GetFailedContext(parentContext, exception)); + } + + //Optional<FailedTask> failedTask = _runningTask != null ? + // Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty(); + //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); + //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl( + //exception, failedContextList, failedTaskOptional, this.evaluatorId)); + } + catch (Exception e) + { + Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER); + } + finally + { + _state = STATE.FAILED; + Dispose(); + } + } + } + + public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage) + { + lock (_evaluatorDescriptor) + { + EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message; + if (heartbeatProto.evaluator_status != null) + { + EvaluatorStatusProto status = heartbeatProto.evaluator_status; + if (status.error != null) + { + Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error))); + return; + } + else if (_state == STATE.SUBMITTED) + { + string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString(); + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId); + // TODO + // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class); + _state = STATE.RUNNING; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId)); + } + } + + LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto); + + EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status; + foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status) + { + Handle(contextStatusProto, heartbeatProto.task_status != null); + } + + if (heartbeatProto.task_status != null) + { + Handle(heartbeatProto.task_status); + } + + if (evaluatorStatusProto.state == State.FAILED) + { + _state = STATE.FAILED; + EvaluatorException e = evaluatorStatusProto.error != null ? + new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) : + new EvaluatorException(_evaluatorId, "unknown cause"); + LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message); + Handle(e); + } + else if (evaluatorStatusProto.state == State.DONE) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id)); + _state = STATE.DONE; + + // TODO + // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() { + //@Override + //public String getId() { + // return EvaluatorManager.this.evaluatorId; + Dispose(); + } + } + LOGGER.Log(Level.Info, "DONE with evaluator heartbeat"); + } + + public void Handle(ResourceLaunchProto resourceLaunchProto) + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.ALLOCATED) + { + _state = STATE.SUBMITTED; + _resourceLaunchHandler.OnNext(resourceLaunchProto); + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime + /// </summary> + /// <param name="contextControlProto"></param> + public void Handle(ContextControlProto contextControlProto) + { + lock (_evaluatorDescriptor) + { + LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId); + EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto(); + evaluatorControlProto.timestamp = DateTime.Now.Ticks; + evaluatorControlProto.identifier = Id; + evaluatorControlProto.context_control = contextControlProto; + + Handle(evaluatorControlProto); + } + } + + /// <summary> + /// Forward the EvaluatorControlProto to the EvaluatorRuntime + /// </summary> + /// <param name="proto"></param> + public void Handle(EvaluatorControlProto proto) + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.RUNNING) + { + _evaluatorControlHandler.OnNext(proto); + } + else + { + var e = new InvalidOperationException( + string.Format( + CultureInfo.InvariantCulture, + "Evaluator manager expects to be in {0} state, but instead is in state {1}", + STATE.RUNNING, + _state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// Resource status information from the (actual) resource manager. + /// </summary> + /// <param name="resourceStatusProto"></param> + public void Handle(ResourceStatusProto resourceStatusProto) + { + lock (_evaluatorDescriptor) + { + State resourceState = resourceStatusProto.state; + LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState); + + if (resourceState == State.DONE || resourceState == State.FAILED) + { + if (_state < STATE.DONE) + { + // something is wrong, I think I'm alive but the resource manager runtime says I'm dead + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.Append( + string.Format( + CultureInfo.InvariantCulture, + "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state", + _evaluatorId, + resourceState, + _state)); + if (resourceStatusProto.diagnostics != null) + { + stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics); + } + if (_runningTask != null) + { + stringBuilder.Append( + string.Format( + CultureInfo.InvariantCulture, + "Taskruntime {0} did not complete before this evaluator died.", + _runningTask.Id)); + } + + // RM is telling me its DONE/FAILED - assuming it has already released the resources + _isResourceReleased = true; + //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask)); + _state = STATE.KILLED; + } + } + } + } + + /// <summary> + /// Handle a context status update + /// </summary> + /// <param name="contextStatusProto"></param> + /// <param name="notifyClientOnNewActiveContext"></param> + private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext) + { + string contextId = contextStatusProto.context_id; + Optional<string> parentId = contextStatusProto.parent_id != null ? + Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty(); + if (ContextStatusProto.State.READY == contextStatusProto.context_state) + { + if (!_activeContextIds.Contains(contextId)) + { + EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId); + AddEvaluatorContext(evaluatorContext); + if (notifyClientOnNewActiveContext) + { + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString()); + //TODO + //dispatcher.onNext(ActiveContext.class, context); + } + } + foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message) + { + byte[] message = contextMessageProto.message; + string sourceId = contextMessageProto.source_id; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message); + // this.dispatcher.onNext(ContextMessage.class, + //new ContextMessageImpl(theMessage, contextID, sourceID)); + } + } + else + { + if (!_activeContextIds.Contains(contextId)) + { + if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) + { + AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId)); + } + else + { + var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state); + Exceptions.Throw(e, LOGGER); + } + } + } + + EvaluatorContext context = GetEvaluatorContext(contextId); + EvaluatorContext parentContext = context.ParentId.IsPresent() ? + GetEvaluatorContext(context.ParentId.Value) : null; + RemoveEvaluatorContext(context); + + if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) + { + // TODO + Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error)); + Optional<IActiveContext> optionalParentContext = (null == parentContext) ? + Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext); + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext); + // TODO + //this.dispatcher.onNext(FailedContext.class, + //context.getFailedContext(optionalParentContext, reason)); + } + else if (ContextStatusProto.State.DONE == contextStatusProto.context_state) + { + if (null != parentContext) + { + // TODO + //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext)); + } + else + { + LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown."); + } + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId)); + Exceptions.Throw(e, LOGGER); + } + } + + /// <summary> + /// Handle task status messages. + /// </summary> + /// <param name="taskStatusProto"></param> + private void Handle(TaskStatusProto taskStatusProto) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state)); + string taskId = taskStatusProto.task_id; + string contextId = taskStatusProto.context_id; + State taskState = taskStatusProto.state; + + if (taskState == State.INIT) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = new RunningTaskImpl(this, taskId, evaluatorContext); + // this.dispatcher.onNext(RunningTask.class, this.runningTask); + } + else if (taskState == State.SUSPEND) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = null; + byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); + //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId)); + } + else if (taskState == State.DONE) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = null; + byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); + //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId)); + } + else if (taskState == State.FAILED) + { + _runningTask = null; + //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + //FailedTask failedTask = taskStatusProto.result != null ? + // new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) : + // new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext)); + //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); + //this.dispatcher.onNext(FailedTask.class, taskException); + } + else if (taskStatusProto.task_message.Count > 0) + { + if (_runningTask != null) + { + var e = new InvalidOperationException("runningTask must be null when there are multiple task messages"); + Exceptions.Throw(e, LOGGER); + } + foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message) + { + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString()); + // this.dispatcher.onNext(TaskMessage.class, + //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), + // taskId, contextId, taskMessageProto.getSourceId())); + } + } + } + + private EvaluatorContext GetEvaluatorContext(string id) + { + foreach (EvaluatorContext context in _activeContexts) + { + if (context.Id.Equals(id)) + { + return context; + } + var e = new InvalidOperationException("Unknown evaluator context with id " + id); + Exceptions.Throw(e, LOGGER); + } + return null; + } + + private void AddEvaluatorContext(EvaluatorContext context) + { + _activeContexts.Add(context); + _activeContextIds.Add(context.Id); + } + + private void RemoveEvaluatorContext(EvaluatorContext context) + { + _activeContexts.Remove(context); + _activeContextIds.Remove(context.Id); + } + + [NamedParameter(documentation: "The Evaluator Identifier.")] + public class EvaluatorIdentifier : Name<string> + { + } + + [NamedParameter(documentation: "The Evaluator Host.")] + public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs new file mode 100644 index 0000000..b0cde11 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Api; +using Org.Apache.Reef.Utilities; +using System; + +namespace Org.Apache.Reef.Driver +{ + /// <summary> + /// An error message that REEF Client receives when there is a user error in REEF job. + /// </summary> + public class FailedJob : AbstractFailure + { + /// <summary> + /// Create an error message given the entity ID and Java Exception. All accessor methods are provided by the base class. + /// </summary> + /// <param name="id"></param> + /// <param name="cause"></param> + public FailedJob(string id, Exception cause) + : base(id, cause) + { + } + + public new string Id { get; set; } + + public new string Message { get; set; } + + public new Optional<string> Description { get; set; } + + public new Optional<Exception> Cause { get; set; } + + public new Optional<byte[]> Data { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs new file mode 100644 index 0000000..b0a37dc --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Driver +{ + /// <summary> + /// empty driver interface to facilitate referencing driver dll + /// </summary> + public interface IDriver + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs new file mode 100644 index 0000000..64ea1f4 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Driver +{ + public interface IStartHandler + { + string Identifier { get; set; } + } +} \ No newline at end of file
