http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs new file mode 100644 index 0000000..5c3c19c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common; +using Org.Apache.REEF.Common.Api; +using Org.Apache.REEF.Common.Catalog; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Exceptions; +using Org.Apache.REEF.Common.ProtoBuf.DriverRuntimeProto; +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Runtime.Event; +using System; +using System.Collections.Generic; +using System.Globalization; +using Org.Apache.REEF.Tang.Implementations.InjectionPlan; + +namespace Org.Apache.REEF.Driver +{ + public class DriverManager : + IEvaluatorRequestor, + IObserver<RuntimeStatusProto>, + IObserver<ResourceStatusProto>, + IObserver<ResourceAllocationProto>, + IObserver<NodeDescriptorProto>, + IObserver<RuntimeStart>, + IObserver<RuntimeStop>, + IObserver<IdleClock> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager)); + + private IInjector _injector; + + private IInjectionFuture<IClock> _clockFuture; + + private ResourceCatalogImpl _resourceCatalog; + + private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler; + + private Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>(); + + private EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker(); + + private ClientJobStatusHandler _clientJobStatusHandler; + + private IDisposable _heartbeatConnectionChannel; + + private IDisposable _errorChannel; + + private IObserver<RuntimeErrorProto> _runtimeErrorHandler; + + public DriverManager( + IInjector injector, + ResourceCatalogImpl resourceCatalog, + IRemoteManager<REEFMessage> remoteManager, + IInjectionFuture<IClock> clockFuture, + IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler, + ClientJobStatusHandler clientJobStatusHandler, + string clientRId) + { + _injector = injector; + _clockFuture = clockFuture; + _resourceCatalog = resourceCatalog; + _futureResourceRequestHandler = futureResourceRequestHandler; + _clientJobStatusHandler = clientJobStatusHandler; + + _heartbeatConnectionChannel = null; + _errorChannel = null; + _runtimeErrorHandler = null; + LOGGER.Log(Level.Info, "DriverManager instantiated"); + } + + public IResourceCatalog ResourceCatalog + { + get + { + return _resourceCatalog; + } + + set + { + } + } + + private RuntimeStatusProto _runtimeStatusProto + { + get + { + RuntimeStatusProto proto = new RuntimeStatusProto(); + proto.state = State.INIT; + proto.name = "REEF"; + proto.outstanding_container_requests = 0; + return proto; + } + + set + { + _runtimeStatusProto = value; + } + } + + public void Submit(IEvaluatorRequest request) + { + LOGGER.Log(Level.Info, "Got an EvaluatorRequest"); + ResourceRequestProto proto = new ResourceRequestProto(); + //TODO: request.size deprecated should use megabytes instead + //switch (request.Size) + //{ + // case EvaluatorRequest.EvaluatorSize.SMALL: + // proto.resource_size = SIZE.SMALL; + // break; + // case EvaluatorRequest.EvaluatorSize.MEDIUM: + // proto.resource_size = SIZE.MEDIUM; + // break; + // case EvaluatorRequest.EvaluatorSize.LARGE: + // proto.resource_size = SIZE.LARGE; + // break; + // case EvaluatorRequest.EvaluatorSize.XLARGE: + // proto.resource_size = SIZE.XLARGE; + // break; + // default: + // throw new InvalidOperationException("invalid request size" + request.Size); + //} + proto.resource_count = request.Number; + if (request.MemoryMegaBytes > 0) + { + proto.memory_size = request.MemoryMegaBytes; + } + + //final ResourceCatalog.Descriptor descriptor = req.getDescriptor(); + //if (descriptor != null) { + // if (descriptor instanceof RackDescriptor) { + // request.addRackName(descriptor.getName()); + // } else if (descriptor instanceof NodeDescriptor) { + // request.addNodeName(descriptor.getName()); + // } + //} + + //_futureResourceRequestHandler.Get().OnNext(proto); + } + + public void Release(EvaluatorManager evaluatorManager) + { + lock (this) + { + string evaluatorManagerId = evaluatorManager.Id; + if (_evaluators.ContainsKey(evaluatorManagerId)) + { + _evaluators.Remove(evaluatorManagerId); + } + else + { + var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// This handles runtime error occurs on the evaluator + /// </summary> + /// <param name="runtimeErrorProto"></param> + public void Handle(RuntimeErrorProto runtimeErrorProto) + { + FailedRuntime error = new FailedRuntime(runtimeErrorProto); + LOGGER.Log(Level.Warning, "Runtime error:" + error); + + EvaluatorException evaluatorException = error.Cause != null + ? new EvaluatorException(error.Id, error.Cause.Value) + : new EvaluatorException(error.Id, "Runtime error"); + EvaluatorManager evaluatorManager = null; + lock (_evaluators) + { + if (_evaluators.ContainsKey(error.Id)) + { + evaluatorManager = _evaluators[error.Id]; + } + else + { + LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause); + } + } + if (null != evaluatorManager) + { + evaluatorManager.Handle(evaluatorException); + } + } + + /// <summary> + /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status + /// </summary> + /// <param name="runtimeStatusProto"></param> + public void OnNext(RuntimeStatusProto runtimeStatusProto) + { + Handle(runtimeStatusProto); + } + + /// <summary> + /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks + /// about the current state of a given resource. Ideally, we should think the same thing. + /// </summary> + /// <param name="resourceStatusProto"></param> + public void OnNext(ResourceStatusProto resourceStatusProto) + { + Handle(resourceStatusProto); + } + + /// <summary> + /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer. + /// </summary> + /// <param name="resourceAllocationProto"></param> + public void OnNext(ResourceAllocationProto resourceAllocationProto) + { + Handle(resourceAllocationProto); + } + + /// <summary> + /// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog + /// so that clients can make resource requests against it. + /// </summary> + /// <param name="nodeDescriptorProto"></param> + public void OnNext(NodeDescriptorProto nodeDescriptorProto) + { + _resourceCatalog.Handle(nodeDescriptorProto); + } + + /// <summary> + /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance + /// point to REEF. + /// </summary> + /// <param name="runtimeStart"></param> + public void OnNext(RuntimeStart runtimeStart) + { + LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart); + _runtimeStatusProto = new RuntimeStatusProto(); + _runtimeStatusProto.state = State.RUNNING; + _runtimeStatusProto.name = "REEF"; + _runtimeStatusProto.outstanding_container_requests = 0; + } + + /// <summary> + /// Handles RuntimeStop + /// </summary> + /// <param name="runtimeStop"></param> + public void OnNext(RuntimeStop runtimeStop) + { + LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop); + if (runtimeStop.Exception != null) + { + string exceptionMessage = runtimeStop.Exception.Message; + LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage); + RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto(); + runtimeErrorProto.message = exceptionMessage; + runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage); + runtimeErrorProto.name = "REEF"; + _runtimeErrorHandler.OnNext(runtimeErrorProto); + + LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage); + } + + lock (_evaluators) + { + foreach (EvaluatorManager evaluatorManager in _evaluators.Values) + { + LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id); + evaluatorManager.Dispose(); + } + } + + try + { + _heartbeatConnectionChannel.Dispose(); + _errorChannel.Dispose(); + Optional<Exception> e = runtimeStop.Exception != null ? + Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty(); + _clientJobStatusHandler.Dispose(e); + + LOGGER.Log(Level.Info, "driver manager closed"); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER); + Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER); + } + } + + public void OnNext(IdleClock value) + { + string message = string.Format( + CultureInfo.InvariantCulture, + "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]", + value + Environment.NewLine, + _runtimeStatusProto.state + Environment.NewLine, + _runtimeStatusProto.outstanding_container_requests + Environment.NewLine, + _runtimeStatusProto.container_allocation.Count); + LOGGER.Log(Level.Info, message); + + lock (_evaluators) + { + if (_runtimeStatusProto.state == State.RUNNING + && _runtimeStatusProto.outstanding_container_requests == 0 + && _runtimeStatusProto.container_allocation.Count == 0) + { + LOGGER.Log(Level.Info, "Idle runtime shutdown"); + _clockFuture.Get().Dispose(); + } + } + } + + void IObserver<IdleClock>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<IdleClock>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<NodeDescriptorProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<NodeDescriptorProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ResourceAllocationProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ResourceAllocationProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ResourceStatusProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ResourceStatusProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStatusProto>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStatusProto>.OnCompleted() + { + throw new NotImplementedException(); + } + + /// <summary> + /// Something went wrong at the runtime layer (either driver or evaluator). This + /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler. + /// </summary> + /// <param name="runtimeErrorProto"></param> + private void Fail(RuntimeErrorProto runtimeErrorProto) + { + _runtimeErrorHandler.OnNext(runtimeErrorProto); + _clockFuture.Get().Dispose(); + } + + /// <summary> + /// Helper method to create a new EvaluatorManager instance + /// </summary> + /// <param name="id">identifier of the Evaluator</param> + /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param> + /// <returns>new EvaluatorManager instance.</returns> + private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor) + { + LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id); + //TODO bindVolatieParameter + return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager)); + } + + /// <summary> + /// Receives and routes heartbeats from Evaluators. + /// </summary> + /// <param name="evaluatorHearBeatProto"></param> + private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto) + { + EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message; + EvaluatorStatusProto status = heartbeat.evaluator_status; + string evaluatorId = status.evaluator_id; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp)); + _sanityChecker.check(evaluatorId, heartbeat.timestamp); + + lock (_evaluators) + { + if (_evaluators.ContainsKey(evaluatorId)) + { + EvaluatorManager evaluatorManager = _evaluators[evaluatorId]; + evaluatorManager.Handle(evaluatorHearBeatProto); + } + else + { + string msg = "Contact from unkonwn evaluator with id: " + evaluatorId; + if (heartbeat.evaluator_status != null) + { + msg += " with state" + status.state; + } + LOGGER.Log(Level.Error, msg); + Exceptions.Throw(new InvalidOperationException(msg), LOGGER); + } + } + } + + /// <summary> + /// This resource status message comes from the ResourceManager layer; telling me what it thinks + /// about the state of the resource executing an Evaluator; This method simply passes the message + /// off to the referenced EvaluatorManager + /// </summary> + /// <param name="resourceStatusProto"></param> + private void Handle(ResourceStatusProto resourceStatusProto) + { + lock (_evaluators) + { + if (_evaluators.ContainsKey(resourceStatusProto.identifier)) + { + EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier]; + evaluatorManager.Handle(resourceStatusProto); + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// This method handles resource allocations by creating a new EvaluatorManager instance. + /// </summary> + /// <param name="resourceAllocationProto"></param> + private void Handle(ResourceAllocationProto resourceAllocationProto) + { + lock (_evaluators) + { + try + { + INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id); + if (nodeDescriptor == null) + { + Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER); + } + EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores); + LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier); + EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor); + _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Error, LOGGER); + Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER); + } + } + } + + private void Handle(RuntimeStatusProto runtimeStatusProto) + { + State runtimeState = runtimeStatusProto.state; + LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state); + + switch (runtimeState) + { + case State.FAILED: + Fail(runtimeStatusProto.error); + break; + case State.DONE: + _clockFuture.Get().Dispose(); + break; + case State.RUNNING: + lock (_evaluators) + { + _runtimeStatusProto = runtimeStatusProto; + if (_clockFuture.Get().IsIdle() + && runtimeStatusProto.outstanding_container_requests == 0 + && runtimeStatusProto.container_allocation.Count == 0) + { + _clockFuture.Get().Dispose(); + } + } + break; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs new file mode 100644 index 0000000..d329ee6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common; +using Org.Apache.REEF.Common.Api; +using Org.Apache.REEF.Common.Catalog; +using Org.Apache.REEF.Common.Client; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.Driver +{ + public class DriverRuntimeConfiguration : ConfigurationModuleBuilder + { + public static ConfigurationModule ConfigurationModule + { + get + { + return new DriverRuntimeConfiguration() + // Resource Catalog + .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class) + + // JobMessageObserver + //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class) + .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class) + + // JobMessageObserver Wake event handler bindings + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class) + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class) + + // Client manager + .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class) + + // Bind the runtime parameters + //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class) + //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class) + + // Bind to the Clock + //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class) + .Build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs new file mode 100644 index 0000000..085e573 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Driver +{ + public class DriverRuntimeConfigurationOptions + { + [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")] + public class JobMessageHandler : Name<ClientJobStatusHandler> + { + } + + [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")] + public class JobExceptionHandler : Name<ClientJobStatusHandler> + { + } + + [NamedParameter(documentation: "Called when a job control message is received by the client.")] + public class JobControlHandler : Name<ClientManager> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs b/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs new file mode 100644 index 0000000..15cb8ca --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver +{ + // TODO: merge with EvaluatorConfigurations class + public class DriverSubmissionSettings + { + // default to "ReefDevClrBridge" + private string _driverIdentifier; + + // default to _defaultSubmissionDirectory is not provided + private string _submissionDirectory; + + // deault to 512MB if no value is provided + private int _driverMemory = 0; + + // default value, client wait till driver exit + private int _clientWaitTime = -1; + + // default to submit to driver with driver config + private bool _submit = true; + + // default to always update jar before submission + private bool _updateJar = true; + + // default to run on local + private bool _runOnYarn; + + // default to set to info logging + private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.INFO; + + /// <summary> + /// Whether to update jar file with needed dlls before submission + /// User can choose to reduce startup time by skipping the update, if jar file already contains all necessary dll + /// Note this settig is .NET only, it does not propagate to java side. + /// </summary> + public bool UpdateJarBeforeSubmission + { + get { return _updateJar; } + set { _updateJar = value; } + } + + /// <summary> + /// Determine the vebosity of Java driver's log. + /// Note this parameter is used when launching java process only, it does not propagate to java side. + /// </summary> + public JavaLoggingSetting JavaLogLevel + { + get { return _javaLogLevel; } + set { _javaLogLevel = value; } + } + + /// <summary> + /// Memory allocated for driver, default to 512 MB + /// </summary> + public int DriverMemory + { + get + { + return _driverMemory; + } + + set + { + if (value < 0) + { + throw new ArgumentException("driver memory cannot be negatvie value."); + } + _driverMemory = value; + } + } + + /// <summary> + /// Driver Identifier, default to "ReefDevClrBridge" + /// </summary> + public string DriverIdentifier + { + get + { + return _driverIdentifier; + } + + set + { + _driverIdentifier = value; + } + } + + /// <summary> + /// Whether to submit driver with config after driver configuration is construted, default to True + /// </summary> + public bool Submit + { + get + { + return _submit; + } + + set + { + _submit = value; + } + } + + /// <summary> + /// How long client would wait for Driver, default to wait till driver is done + /// </summary> + public int ClientWaitTime + { + get + { + return _clientWaitTime; + } + + set + { + _clientWaitTime = value; + } + } + + /// <summary> + /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name + /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs + /// </summary> + public string SubmissionDirectory + { + get + { + return _submissionDirectory; + } + + set + { + _submissionDirectory = value; + } + } + + /// <summary> + /// Whether to Run on YARN runtime, default to false + /// </summary> + public bool RunOnYarn + { + get + { + return _runOnYarn; + } + + set + { + _runOnYarn = value; + } + } + + public string ToComamndLineArguments() + { + return + (RunOnYarn ? " -local false" : string.Empty) + + (!Submit ? " -submit false" : string.Empty) + + (DriverMemory > 0 ? " -driver_memory " + DriverMemory : string.Empty) + + (!string.IsNullOrWhiteSpace(DriverIdentifier) ? " -drive_id " + DriverIdentifier : string.Empty) + + (ClientWaitTime > 0 ? " -wait_time " + ClientWaitTime : string.Empty) + + (!string.IsNullOrWhiteSpace(SubmissionDirectory) ? " -submission_directory " + SubmissionDirectory : string.Empty); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs new file mode 100644 index 0000000..b8ebef9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs @@ -0,0 +1,655 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Api; +using Org.Apache.REEF.Common.Catalog; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Exceptions; +using Org.Apache.REEF.Common.ProtoBuf.DriverRuntimeProto; +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; + +using TaskMessage = Org.Apache.REEF.Tasks.TaskMessage; + +namespace Org.Apache.REEF.Driver +{ + /// <summary> + /// Manages a single Evaluator instance including all lifecycle instances: + /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). + /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. + /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. + /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. + /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate + /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances: + /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator). + /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager. + /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel. + /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime. + /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend). + /// </summary> + public class EvaluatorManager : IDisposable, IIdentifiable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager)); + + private STATE _state = STATE.ALLOCATED; + + private IClock _clock; + + // TODO + // private final RemoteManager remoteManager; + private DriverManager _driverManager; + + private IResourceReleaseHandler _resourceReleaseHandler; + + private IResourceLaunchHandler _resourceLaunchHandler; + + private EvaluatorDescriptorImpl _evaluatorDescriptor; + + private string _evaluatorId; + + private IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>(); + + private HashSet<string> _activeContextIds = new HashSet<string>(); + + private IRunningTask _runningTask = null; + + private IObserver<EvaluatorControlProto> _evaluatorControlHandler = null; + + private bool _isResourceReleased = false; + + //TODO + //private final DispatchingEStage dispatcher; + private EvaluatorType _type = EvaluatorType.CLR; + + public EvaluatorManager( + IClock clock, + //RemoteManager remoteManager, + DriverManager driverManager, + IResourceReleaseHandler resourceReleaseHandler, + IResourceLaunchHandler resourceLaunchHandler, + //REEFErrorHandler errorHandler, + string evaluatorId, + EvaluatorDescriptorImpl evaluatorDescriptor, + ISet<IObservable<IActiveContext>> activeContextEventHandler, + ISet<IObservable<IClosedContext>> closedContextEventHandlers, + ISet<IObservable<FailedContext>> failedContextEventHandlers, + ISet<IObservable<ContextMessage>> contextMessageHandlers, + ISet<IObservable<IRunningTask>> runningTaskEventHandlers, + ISet<IObservable<ICompletedTask>> completedTaskEventHandlers, + ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers, + ISet<IObservable<TaskMessage>> taskMessageEventHandlers, + ISet<IObservable<FailedTask>> taskExceptionEventHandlers, + ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers, + ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers, + ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers) + { + _clock = clock; + //_remoteManager = remoteManager; + _driverManager = driverManager; + _resourceReleaseHandler = resourceReleaseHandler; + _resourceLaunchHandler = resourceLaunchHandler; + _evaluatorId = evaluatorId; + _evaluatorDescriptor = evaluatorDescriptor; + + //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads + + //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers); + //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers); + //this.dispatcher.register(FailedContext.class, failedContextEventHandlers); + //this.dispatcher.register(ContextMessage.class, contextMessageHandlers); + + //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers); + //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers); + //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers); + //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers); + //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers); + + //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers); + //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers); + //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers); + + //this.dispatcher.onNext(AllocatedEvaluator.class, + // new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier())); + } + + /// <summary> + /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager. + /// </summary> + public enum STATE + { + ALLOCATED, // initial state + SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact + RUNNING, // first contact received, all communication channels established, Evaluator sent to client. + DONE, // clean shutdown + FAILED, // some failure occurred. + KILLED // unclean shutdown + } + + public IEvaluatorDescriptor EvaluatorDescriptor + { + get + { + return _evaluatorDescriptor; + } + } + + public INodeDescriptor NodeDescriptor + { + get + { + return EvaluatorDescriptor.NodeDescriptor; + } + } + + public IRunningTask RunningTask + { + get + { + lock (_evaluatorDescriptor) + { + return _runningTask; + } + } + } + + public string Id + { + get + { + return _evaluatorId; + } + + set + { + } + } + + public EvaluatorType Type + { + get + { + return _type; + } + + set + { + _type = value; + _evaluatorDescriptor.EvaluatorType = value; + } + } + + public void Dispose() + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.RUNNING) + { + LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id); + try + { + // Killing the evaluator means that it doesn't need to send a confirmation; it just dies. + EvaluatorControlProto proto = new EvaluatorControlProto(); + proto.timestamp = DateTime.Now.Ticks; + proto.identifier = Id; + proto.kill_evaluator = new KillEvaluatorProto(); + Handle(proto); + } + finally + { + _state = STATE.KILLED; + } + } + } + + if (!_isResourceReleased) + { + try + { + // We need to wait awhile before returning the container to the RM in order to + // give the EvaluatorRuntime (and Launcher) time to cleanly exit. + + // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { + //@Override + //public void onNext(final Alarm alarm) { + // EvaluatorManager.this.resourceReleaseHandler.onNext( + // DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() + // .setIdentifier(EvaluatorManager.this.evaluatorId).build()); + } + catch (Exception e) + { + Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER); + ResourceReleaseProto proto = new ResourceReleaseProto(); + proto.identifier = _evaluatorId; + _resourceReleaseHandler.OnNext(proto); + } + finally + { + _isResourceReleased = true; + _driverManager.Release(this); + } + } + } + + /// <summary> + /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED + /// </summary> + /// <param name="exception"></param> + public void Handle(EvaluatorException exception) + { + lock (_evaluatorDescriptor) + { + if (_state >= STATE.DONE) + { + return; + } + LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message); + try + { + IList<FailedContext> failedContexts = new List<FailedContext>(); + IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts); + activeContexts = activeContexts.Reverse().ToList(); + foreach (EvaluatorContext context in activeContexts) + { + Optional<IActiveContext> parentContext = context.ParentId.IsPresent() + ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value)) + : Optional<IActiveContext>.Empty(); + failedContexts.Add(context.GetFailedContext(parentContext, exception)); + } + + //Optional<FailedTask> failedTask = _runningTask != null ? + // Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty(); + //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); + //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl( + //exception, failedContextList, failedTaskOptional, this.evaluatorId)); + } + catch (Exception e) + { + Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER); + } + finally + { + _state = STATE.FAILED; + Dispose(); + } + } + } + + public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage) + { + lock (_evaluatorDescriptor) + { + EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message; + if (heartbeatProto.evaluator_status != null) + { + EvaluatorStatusProto status = heartbeatProto.evaluator_status; + if (status.error != null) + { + Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error))); + return; + } + else if (_state == STATE.SUBMITTED) + { + string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString(); + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId); + // TODO + // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class); + _state = STATE.RUNNING; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId)); + } + } + + LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto); + + EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status; + foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status) + { + Handle(contextStatusProto, heartbeatProto.task_status != null); + } + + if (heartbeatProto.task_status != null) + { + Handle(heartbeatProto.task_status); + } + + if (evaluatorStatusProto.state == State.FAILED) + { + _state = STATE.FAILED; + EvaluatorException e = evaluatorStatusProto.error != null ? + new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) : + new EvaluatorException(_evaluatorId, "unknown cause"); + LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message); + Handle(e); + } + else if (evaluatorStatusProto.state == State.DONE) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id)); + _state = STATE.DONE; + + // TODO + // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() { + //@Override + //public String getId() { + // return EvaluatorManager.this.evaluatorId; + Dispose(); + } + } + LOGGER.Log(Level.Info, "DONE with evaluator heartbeat"); + } + + public void Handle(ResourceLaunchProto resourceLaunchProto) + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.ALLOCATED) + { + _state = STATE.SUBMITTED; + _resourceLaunchHandler.OnNext(resourceLaunchProto); + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime + /// </summary> + /// <param name="contextControlProto"></param> + public void Handle(ContextControlProto contextControlProto) + { + lock (_evaluatorDescriptor) + { + LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId); + EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto(); + evaluatorControlProto.timestamp = DateTime.Now.Ticks; + evaluatorControlProto.identifier = Id; + evaluatorControlProto.context_control = contextControlProto; + + Handle(evaluatorControlProto); + } + } + + /// <summary> + /// Forward the EvaluatorControlProto to the EvaluatorRuntime + /// </summary> + /// <param name="proto"></param> + public void Handle(EvaluatorControlProto proto) + { + lock (_evaluatorDescriptor) + { + if (_state == STATE.RUNNING) + { + _evaluatorControlHandler.OnNext(proto); + } + else + { + var e = new InvalidOperationException( + string.Format( + CultureInfo.InvariantCulture, + "Evaluator manager expects to be in {0} state, but instead is in state {1}", + STATE.RUNNING, + _state)); + Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// Resource status information from the (actual) resource manager. + /// </summary> + /// <param name="resourceStatusProto"></param> + public void Handle(ResourceStatusProto resourceStatusProto) + { + lock (_evaluatorDescriptor) + { + State resourceState = resourceStatusProto.state; + LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState); + + if (resourceState == State.DONE || resourceState == State.FAILED) + { + if (_state < STATE.DONE) + { + // something is wrong, I think I'm alive but the resource manager runtime says I'm dead + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.Append( + string.Format( + CultureInfo.InvariantCulture, + "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state", + _evaluatorId, + resourceState, + _state)); + if (resourceStatusProto.diagnostics != null) + { + stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics); + } + if (_runningTask != null) + { + stringBuilder.Append( + string.Format( + CultureInfo.InvariantCulture, + "Taskruntime {0} did not complete before this evaluator died.", + _runningTask.Id)); + } + + // RM is telling me its DONE/FAILED - assuming it has already released the resources + _isResourceReleased = true; + //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask)); + _state = STATE.KILLED; + } + } + } + } + + /// <summary> + /// Handle a context status update + /// </summary> + /// <param name="contextStatusProto"></param> + /// <param name="notifyClientOnNewActiveContext"></param> + private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext) + { + string contextId = contextStatusProto.context_id; + Optional<string> parentId = contextStatusProto.parent_id != null ? + Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty(); + if (ContextStatusProto.State.READY == contextStatusProto.context_state) + { + if (!_activeContextIds.Contains(contextId)) + { + EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId); + AddEvaluatorContext(evaluatorContext); + if (notifyClientOnNewActiveContext) + { + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString()); + //TODO + //dispatcher.onNext(ActiveContext.class, context); + } + } + foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message) + { + byte[] message = contextMessageProto.message; + string sourceId = contextMessageProto.source_id; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message); + // this.dispatcher.onNext(ContextMessage.class, + //new ContextMessageImpl(theMessage, contextID, sourceID)); + } + } + else + { + if (!_activeContextIds.Contains(contextId)) + { + if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) + { + AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId)); + } + else + { + var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state); + Exceptions.Throw(e, LOGGER); + } + } + } + + EvaluatorContext context = GetEvaluatorContext(contextId); + EvaluatorContext parentContext = context.ParentId.IsPresent() ? + GetEvaluatorContext(context.ParentId.Value) : null; + RemoveEvaluatorContext(context); + + if (ContextStatusProto.State.FAIL == contextStatusProto.context_state) + { + // TODO + Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error)); + Optional<IActiveContext> optionalParentContext = (null == parentContext) ? + Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext); + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext); + // TODO + //this.dispatcher.onNext(FailedContext.class, + //context.getFailedContext(optionalParentContext, reason)); + } + else if (ContextStatusProto.State.DONE == contextStatusProto.context_state) + { + if (null != parentContext) + { + // TODO + //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext)); + } + else + { + LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown."); + } + } + else + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId)); + Exceptions.Throw(e, LOGGER); + } + } + + /// <summary> + /// Handle task status messages. + /// </summary> + /// <param name="taskStatusProto"></param> + private void Handle(TaskStatusProto taskStatusProto) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state)); + string taskId = taskStatusProto.task_id; + string contextId = taskStatusProto.context_id; + State taskState = taskStatusProto.state; + + if (taskState == State.INIT) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = new RunningTaskImpl(this, taskId, evaluatorContext); + // this.dispatcher.onNext(RunningTask.class, this.runningTask); + } + else if (taskState == State.SUSPEND) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = null; + byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); + //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId)); + } + else if (taskState == State.DONE) + { + EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + _runningTask = null; + byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null; + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString()); + //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId)); + } + else if (taskState == State.FAILED) + { + _runningTask = null; + //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId); + //FailedTask failedTask = taskStatusProto.result != null ? + // new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) : + // new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext)); + //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString()); + //this.dispatcher.onNext(FailedTask.class, taskException); + } + else if (taskStatusProto.task_message.Count > 0) + { + if (_runningTask != null) + { + var e = new InvalidOperationException("runningTask must be null when there are multiple task messages"); + Exceptions.Throw(e, LOGGER); + } + foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message) + { + LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString()); + // this.dispatcher.onNext(TaskMessage.class, + //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), + // taskId, contextId, taskMessageProto.getSourceId())); + } + } + } + + private EvaluatorContext GetEvaluatorContext(string id) + { + foreach (EvaluatorContext context in _activeContexts) + { + if (context.Id.Equals(id)) + { + return context; + } + var e = new InvalidOperationException("Unknown evaluator context with id " + id); + Exceptions.Throw(e, LOGGER); + } + return null; + } + + private void AddEvaluatorContext(EvaluatorContext context) + { + _activeContexts.Add(context); + _activeContextIds.Add(context.Id); + } + + private void RemoveEvaluatorContext(EvaluatorContext context) + { + _activeContexts.Remove(context); + _activeContextIds.Remove(context.Id); + } + + [NamedParameter(documentation: "The Evaluator Identifier.")] + public class EvaluatorIdentifier : Name<string> + { + } + + [NamedParameter(documentation: "The Evaluator Host.")] + public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs b/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs new file mode 100644 index 0000000..5f36437 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Api; +using Org.Apache.REEF.Utilities; +using System; + +namespace Org.Apache.REEF.Driver +{ + /// <summary> + /// An error message that REEF Client receives when there is a user error in REEF job. + /// </summary> + public class FailedJob : AbstractFailure + { + /// <summary> + /// Create an error message given the entity ID and Java Exception. All accessor methods are provided by the base class. + /// </summary> + /// <param name="id"></param> + /// <param name="cause"></param> + public FailedJob(string id, Exception cause) + : base(id, cause) + { + } + + public new string Id { get; set; } + + public new string Message { get; set; } + + public new Optional<string> Description { get; set; } + + public new Optional<Exception> Cause { get; set; } + + public new Optional<byte[]> Data { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/IDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/IDriver.cs b/lang/cs/Org.Apache.REEF.Driver/IDriver.cs new file mode 100644 index 0000000..e917ada --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/IDriver.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Driver +{ + /// <summary> + /// empty driver interface to facilitate referencing driver dll + /// </summary> + public interface IDriver + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs b/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs new file mode 100644 index 0000000..0f03295 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Driver +{ + public interface IStartHandler + { + string Identifier { get; set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj new file mode 100644 index 0000000..ef874e8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -0,0 +1,226 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{A6BAA2A7-F52F-4329-884E-1BCF711D6805}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Driver</RootNamespace> + <AssemblyName>Org.Apache.REEF.Driver</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\..</SolutionDir> + <RestorePackages>true</RestorePackages> + </PropertyGroup> + <Import Project="$(SolutionDir)\Source\build.props" /> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Microsoft.Hadoop.Avro"> + <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.1.4.0.0\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath> + </Reference> + <Reference Include="Newtonsoft.Json"> + <HintPath>$(PackagesDir)\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath> + </Reference> + <Reference Include="protobuf-net"> + <HintPath>$(PackagesDir)\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="bridge\BridgeLogger.cs" /> + <Compile Include="bridge\clr2java\IActiveContextClr2Java.cs" /> + <Compile Include="bridge\clr2java\IAllocatedEvaluaotrClr2Java.cs" /> + <Compile Include="bridge\clr2java\IClosedContextClr2Java.cs" /> + <Compile Include="bridge\clr2java\IClr2Java.cs" /> + <Compile Include="bridge\clr2java\ICompletedEvaluatorClr2Java.cs" /> + <Compile Include="bridge\clr2java\ICompletedTaskClr2Java.cs" /> + <Compile Include="bridge\clr2java\IContextMessageClr2Java.cs" /> + <Compile Include="bridge\clr2java\IEvaluatorRequestorClr2Java.cs" /> + <Compile Include="bridge\clr2java\IFailedContextClr2Java.cs" /> + <Compile Include="bridge\clr2java\IFailedEvaluatorClr2Java.cs" /> + <Compile Include="bridge\clr2java\IFailedTaskClr2Java.cs" /> + <Compile Include="bridge\clr2java\IHttpServerBridgeClr2Java.cs" /> + <Compile Include="bridge\clr2java\IRunningTaskClr2Java.cs" /> + <Compile Include="bridge\clr2java\ISuspendedTaskClr2Java.cs" /> + <Compile Include="bridge\clr2java\ITaskMessageClr2Java.cs" /> + <Compile Include="bridge\ClrClientHelper.cs" /> + <Compile Include="bridge\ClrHandlerHelper.cs" /> + <Compile Include="bridge\ClrSystemHandler.cs" /> + <Compile Include="bridge\ClrSystemHandlerWrapper.cs" /> + <Compile Include="bridge\DriverBridge.cs" /> + <Compile Include="bridge\DriverBridgeConfiguration.cs" /> + <Compile Include="bridge\DriverBridgeConfigurationOptions.cs" /> + <Compile Include="bridge\events\ActiveContext.cs" /> + <Compile Include="bridge\events\AllocatedEvaluator.cs" /> + <Compile Include="bridge\events\ClosedContext.cs" /> + <Compile Include="bridge\events\CompletedEvaluator.cs" /> + <Compile Include="bridge\events\CompletedTask.cs" /> + <Compile Include="bridge\events\ContextMessage.cs" /> + <Compile Include="bridge\events\EvaluatorRequstor.cs" /> + <Compile Include="bridge\events\FailedContext.cs" /> + <Compile Include="bridge\events\FailedEvaluator.cs" /> + <Compile Include="bridge\events\FailedTask.cs" /> + <Compile Include="bridge\events\RunningTask.cs" /> + <Compile Include="bridge\events\SuspendedTask.cs" /> + <Compile Include="bridge\events\TaskMessage.cs" /> + <Compile Include="bridge\HttpMessage.cs" /> + <Compile Include="bridge\HttpServerHandler.cs" /> + <Compile Include="bridge\HttpServerPort.cs" /> + <Compile Include="bridge\IHttpHandler.cs" /> + <Compile Include="bridge\IHttpMessage.cs" /> + <Compile Include="bridge\ReefHttpRequest.cs" /> + <Compile Include="bridge\ReefHttpResponse.cs" /> + <Compile Include="ClientManager.cs" /> + <Compile Include="Constants.cs" /> + <Compile Include="context\ContextConfiguration.cs" /> + <Compile Include="context\ContextConfigurationOptions.cs" /> + <Compile Include="context\defaults\DefaultContextMessageSource.cs" /> + <Compile Include="context\defaults\DefaultContextStartHandler.cs" /> + <Compile Include="context\defaults\DefaultContextStopHandler.cs" /> + <Compile Include="context\EvaluatorContext.cs" /> + <Compile Include="context\IActiveContext.cs" /> + <Compile Include="context\IClosedContext.cs" /> + <Compile Include="context\IContext.cs" /> + <Compile Include="context\IFailedContext.cs" /> + <Compile Include="contract\IBridgeContract.cs" /> + <Compile Include="defaults\DefaultClientCloseHandler.cs" /> + <Compile Include="defaults\DefaultClientCloseWithMessageHandler.cs" /> + <Compile Include="defaults\DefaultClientMessageHandler.cs" /> + <Compile Include="defaults\DefaultContextActiveHandler.cs" /> + <Compile Include="defaults\DefaultContextClosureHandler.cs" /> + <Compile Include="defaults\DefaultContextFailureHandler.cs" /> + <Compile Include="defaults\DefaultContextMessageHandler.cs" /> + <Compile Include="defaults\DefaultCustomTraceListener.cs" /> + <Compile Include="defaults\DefaultDriverRestartContextActiveHandler.cs" /> + <Compile Include="defaults\DefaultDriverRestartHandler.cs" /> + <Compile Include="defaults\DefaultDriverRestartTaskRunningHandler.cs" /> + <Compile Include="defaults\DefaultEvaluatorAllocationHandler.cs" /> + <Compile Include="defaults\DefaultEvaluatorCompletionHandler.cs" /> + <Compile Include="defaults\DefaultEvaluatorFailureHandler.cs" /> + <Compile Include="defaults\DefaultEvaluatorRequestorHandler.cs" /> + <Compile Include="defaults\DefaultHttpHandler.cs" /> + <Compile Include="defaults\DefaultTaskCompletionHandler.cs" /> + <Compile Include="defaults\DefaultTaskFailureHandler.cs" /> + <Compile Include="defaults\DefaultTaskMessageHandler.cs" /> + <Compile Include="defaults\DefaultTaskRunningHandler.cs" /> + <Compile Include="defaults\DefaultTaskSuspensionHandler.cs" /> + <Compile Include="DriverConfigGenerator.cs" /> + <Compile Include="DriverConfigurationSettings.cs" /> + <Compile Include="DriverManager.cs" /> + <Compile Include="DriverRuntimeConfiguration.cs" /> + <Compile Include="DriverRuntimeConfigurationOptions.cs" /> + <Compile Include="DriverSubmissionSettings.cs" /> + <Compile Include="EvaluatorManager.cs" /> + <Compile Include="evaluator\EvaluatorDescriptorImpl.cs" /> + <Compile Include="evaluator\EvaluatorRequest.cs" /> + <Compile Include="evaluator\EvaluatorRequestBuilder.cs" /> + <Compile Include="evaluator\IAllocatedEvaluator.cs" /> + <Compile Include="evaluator\ICompletedEvaluator.cs" /> + <Compile Include="evaluator\IEvaluatorDescriptor.cs" /> + <Compile Include="evaluator\IEvaluatorRequest .cs" /> + <Compile Include="evaluator\IEvaluatorRequestor.cs" /> + <Compile Include="evaluator\IFailedEvaluator.cs" /> + <Compile Include="FailedJob.cs" /> + <Compile Include="IDriver.cs" /> + <Compile Include="IStartHandler.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="task\ICompletedTask.cs" /> + <Compile Include="task\IFailedTask.cs" /> + <Compile Include="task\IRunningTask.cs" /> + <Compile Include="task\ISuspendedTask.cs" /> + <Compile Include="task\ITaskMessage.cs" /> + <Compile Include="task\RunningTaskImpl.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> + <Name>ReefCommon</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..79fba06 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Driver")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Driver")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("81ea2648-b341-4852-93b0-806da615c6b8")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs new file mode 100644 index 0000000..3e2dada --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge +{ + /// <summary> + /// A wrapper around the general Logger class used specifically for + /// logging in CPP bridge code. + /// This is enabled when trace leve is above Level.Info (included) + /// </summary> + public class BridgeLogger + { + private Logger _logger; + + public BridgeLogger(string name) + { + _logger = Logger.GetLogger(name); + } + + public static BridgeLogger GetLogger(string className) + { + return new BridgeLogger(className); + } + + public void Log(string message) + { + _logger.Log(Level.Info, message); + } + + public void LogStart(string message) + { + _logger.Log(Level.Start, message); + } + + public void LogStop(string message) + { + _logger.Log(Level.Stop, message); + } + + public void LogError(string message, Exception e) + { + _logger.Log(Level.Error, message, e); + } + } +}
