http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs new file mode 100644 index 0000000..531ebbf --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using System.IO; +using System.Runtime.InteropServices; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Bridge.Events; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Wake.Time.Event; +using ContextMessage = Org.Apache.REEF.Driver.Bridge.Events.ContextMessage; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class ClrSystemHandlerWrapper + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrSystemHandlerWrapper)); + + private static DriverBridge _driverBridge; + + public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluaotrClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target; + obj.OnNext(new AllocatedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemActiveContextHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; + obj.OnNext(new ActiveContext(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestartActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartActiveContextHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; + obj.OnNext(new ActiveContext(clr2Java)); + } + } + + public static void Call_ClrSystemEvaluatorRequestor_OnNext(ulong handle, IEvaluatorRequestorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemEvaluatorRequestor_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IEvaluatorRequestor> obj = (ClrSystemHandler<IEvaluatorRequestor>)gc.Target; + obj.OnNext(new EvaluatorRequestor(clr2Java)); + } + } + + public static void Call_ClrSystemTaskMessage_OnNext(ulong handle, ITaskMessageClr2Java clr2Java, byte[] message) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemTaskMessage_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ITaskMessage> obj = (ClrSystemHandler<ITaskMessage>)gc.Target; + obj.OnNext(new TaskMessage(clr2Java, message)); + } + } + + public static void Call_ClrSystemFailedTask_OnNext(ulong handle, IFailedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedTask> obj = (ClrSystemHandler<IFailedTask>)gc.Target; + obj.OnNext(new FailedTask(clr2Java)); + } + } + + public static void Call_ClrSystemRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRunningTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; + obj.OnNext(new RunningTask(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestartRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartRunningTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; + obj.OnNext(new RunningTask(clr2Java)); + } + } + + public static void Call_ClrSystemFailedEvaluator_OnNext(ulong handle, IFailedEvaluatorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedEvaluator_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedEvaluator> obj = (ClrSystemHandler<IFailedEvaluator>)gc.Target; + obj.OnNext(new FailedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemCompletedTask_OnNext(ulong handle, ICompletedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ICompletedTask> obj = (ClrSystemHandler<ICompletedTask>)gc.Target; + obj.OnNext(new CompletedTask(clr2Java)); + } + } + + public static void Call_ClrSystemSuspendedTask_OnNext(ulong handle, ISuspendedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemSuspendedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ISuspendedTask> obj = (ClrSystemHandler<ISuspendedTask>)gc.Target; + obj.OnNext(new SuspendedTask(clr2Java)); + } + } + + public static void Call_ClrSystemCompletedEvaluator_OnNext(ulong handle, ICompletedEvaluatorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedEvaluator_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ICompletedEvaluator> obj = (ClrSystemHandler<ICompletedEvaluator>)gc.Target; + obj.OnNext(new CompletedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemHttpServer_OnNext(ulong handle, IHttpServerBridgeClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemHttpServer_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IHttpMessage> obj = (ClrSystemHandler<IHttpMessage>)gc.Target; + obj.OnNext(new HttpMessage(clr2Java)); + } + } + + public static void Call_ClrSystemClosedContext_OnNext(ulong handle, IClosedContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemClosedContext_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IClosedContext> obj = (ClrSystemHandler<IClosedContext>)gc.Target; + obj.OnNext(new ClosedContext(clr2Java)); + } + } + + public static void Call_ClrSystemFailedContext_OnNext(ulong handle, IFailedContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedContext_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedContext> obj = (ClrSystemHandler<IFailedContext>)gc.Target; + obj.OnNext(new FailedContext(clr2Java)); + } + } + + public static void Call_ClrSystemContextMessage_OnNext(ulong handle, IContextMessageClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemContextMessage_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IContextMessage> obj = (ClrSystemHandler<IContextMessage>)gc.Target; + obj.OnNext(new ContextMessage(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestart_OnNext(ulong handle) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestart_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<StartTime> obj = (ClrSystemHandler<StartTime>)gc.Target; + obj.OnNext(new StartTime(DateTime.Now.Ticks)); + } + } + + //Deprecate, remove after both Java and C# code gets checked in + public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) + { + LOGGER.Log(Level.Info, "*** Start time is " + startTime); + return GetHandlers(null); + } + } + + public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime, string httpServerPort) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) + { + LOGGER.Log(Level.Info, "*** Start time is " + startTime); + LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); + return GetHandlers(httpServerPort); + } + } + + private static ulong[] GetHandlers(string httpServerPortNumber) + { + IStartHandler startHandler; + IInjector injector = null; + string errorMessage; + string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", Constants.DriverBridgeConfiguration); + if (!File.Exists(bridgeConfiguration)) + { + errorMessage = "Cannot find CLR Driver bridge configuration file " + bridgeConfiguration; + Exceptions.Throw(new InvalidOperationException(errorMessage), LOGGER); + } + try + { + IConfiguration driverBridgeConfiguration = new AvroConfigurationSerializer().FromFile(bridgeConfiguration); + injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration); + } + catch (Exception e) + { + errorMessage = "Failed to get injector from driver bridge configuration."; + Exceptions.CaughtAndThrow(new InvalidOperationException(errorMessage, e), Level.Error, errorMessage, LOGGER); + } + + try + { + HttpServerPort port = injector.GetInstance<HttpServerPort>(); + port.PortNumber = httpServerPortNumber == null ? 0 : int.Parse(httpServerPortNumber, CultureInfo.InvariantCulture); + + startHandler = injector.GetInstance<IStartHandler>(); + LOGGER.Log(Level.Info, "Start handler set to be " + startHandler.Identifier); + _driverBridge = injector.GetInstance<DriverBridge>(); + } + catch (Exception e) + { + Exceptions.CaughtAndThrow(e, Level.Error, "Cannot get instance.", LOGGER); + } + + return _driverBridge.Subscribe(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs new file mode 100644 index 0000000..1ecda50 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; + +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Event; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class DriverBridge + { + private static Logger _logger; + + private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorSubscriber; + + private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorSubscriber; + + private static ClrSystemHandler<ITaskMessage> _taskMessageSubscriber; + + private static ClrSystemHandler<IActiveContext> _activeContextSubscriber; + + private static ClrSystemHandler<IActiveContext> _driverRestartActiveContextSubscriber; + + private static ClrSystemHandler<IFailedTask> _failedTaskSubscriber; + + private static ClrSystemHandler<IRunningTask> _runningTaskSubscriber; + + private static ClrSystemHandler<IRunningTask> _driverRestartRunningTaskSubscriber; + + private static ClrSystemHandler<ISuspendedTask> _suspendedTaskSubscriber; + + private static ClrSystemHandler<IFailedEvaluator> _failedEvaluatorSubscriber; + + private static ClrSystemHandler<ICompletedEvaluator> _completedEvaluatorSubscriber; + + private static ClrSystemHandler<IHttpMessage> _httpServerEventSubscriber; + + private static ClrSystemHandler<ICompletedTask> _completedTaskSubscriber; + + private static ClrSystemHandler<IClosedContext> _closedContextSubscriber; + + private static ClrSystemHandler<IFailedContext> _failedContextSubscriber; + + private static ClrSystemHandler<IContextMessage> _contextMessageSubscriber; + + private static ClrSystemHandler<StartTime> _driverRestartSubscriber; + + private IObserver<StartTime> _driverRestartHandler; + + private ISet<IObserver<IEvaluatorRequestor>> _evaluatorRequestHandlers; + + private ISet<IObserver<IAllocatedEvaluator>> _allocatedEvaluatorHandlers; + + private ISet<IObserver<IActiveContext>> _activeContextHandlers; + + private ISet<IObserver<IActiveContext>> _driverRestartActiveContextHandlers; + + private ISet<IObserver<ITaskMessage>> _taskMessageHandlers; + + private ISet<IObserver<IFailedTask>> _failedTaskHandlers; + + private ISet<IObserver<ISuspendedTask>> _suspendedTaskHandlers; + + private ISet<IObserver<IRunningTask>> _runningTaskHandlers; + + private ISet<IObserver<IRunningTask>> _driverRestartRunningTaskHandlers; + + private ISet<IObserver<IFailedEvaluator>> _failedEvaluatorHandlers; + + private ISet<IObserver<ICompletedEvaluator>> _completedEvaluatorHandlers; + + private ISet<IObserver<IClosedContext>> _closedContextHandlers; + + private ISet<IObserver<IFailedContext>> _failedContextHandlers; + + private ISet<IObserver<IContextMessage>> _contextMessageHandlers; + + private ISet<IObserver<ICompletedTask>> _completedTaskHandlers; + + private HttpServerHandler _httpServerHandler; + + [Inject] + public DriverBridge( + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartHandler))] IObserver<StartTime> driverRestartHandler, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.EvaluatorRequestHandlers))] ISet<IObserver<IEvaluatorRequestor>> evaluatorRequestHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers))] ISet<IObserver<IAllocatedEvaluator>> allocatedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ActiveContextHandlers))] ISet<IObserver<IActiveContext>> activeContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TaskMessageHandlers))] ISet<IObserver<ITaskMessage>> taskMessageHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedTaskHandlers))] ISet<IObserver<IFailedTask>> failedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedEvaluatorHandlers))] ISet<IObserver<IFailedEvaluator>> failedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers))] ISet<IObserver<ICompletedEvaluator>> completedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.RunningTaskHandlers))] ISet<IObserver<IRunningTask>> runningTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedTaskHandlers))] ISet<IObserver<ICompletedTask>> completedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.SuspendedTaskHandlers))] ISet<IObserver<ISuspendedTask>> suspendedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ClosedContextHandlers))] ISet<IObserver<IClosedContext>> closedContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedContextHandlers))] ISet<IObserver<IFailedContext>> failedContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ContextMessageHandlers))] ISet<IObserver<IContextMessage>> contextMessageHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))] ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))] ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> traceListeners, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel, + HttpServerHandler httpServerHandler) + { + foreach (TraceListener listener in traceListeners) + { + Logger.AddTraceListner(listener); + } + _logger = Logger.GetLogger(typeof(DriverBridge)); + _logger.Log(Level.Info, "Constructing DriverBridge"); + + Level level; + if (!Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) + { + _logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Invalid trace level {0} provided, will by default use verbose level", traceLevel)); + } + else + { + Logger.SetCustomLevel(level); + } + + _evaluatorRequestHandlers = evaluatorRequestHandlers; + _allocatedEvaluatorHandlers = allocatedEvaluatorHandlers; + _activeContextHandlers = activeContextHandlers; + _taskMessageHandlers = taskMessageHandlers; + _failedEvaluatorHandlers = failedEvaluatorHandlers; + _failedTaskHandlers = failedTaskHandlers; + _completedTaskHandlers = completedTaskHandlers; + _runningTaskHandlers = runningTaskHandlers; + _suspendedTaskHandlers = suspendedTaskHandlers; + _completedEvaluatorHandlers = completedEvaluatorHandlers; + _closedContextHandlers = closedContextHandlers; + _failedContextHandlers = failedContextHandlers; + _contextMessageHandlers = contextMessageHandlers; + _driverRestartHandler = driverRestartHandler; + _driverRestartActiveContextHandlers = driverRestartActiveContextHandlers; + _driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers; + _httpServerHandler = httpServerHandler; + + _evaluatorRequestorSubscriber = new ClrSystemHandler<IEvaluatorRequestor>(); + _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>(); + _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>(); + _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>(); + _activeContextSubscriber = new ClrSystemHandler<IActiveContext>(); + _failedTaskSubscriber = new ClrSystemHandler<IFailedTask>(); + _failedEvaluatorSubscriber = new ClrSystemHandler<IFailedEvaluator>(); + _httpServerEventSubscriber = new ClrSystemHandler<IHttpMessage>(); + _completedTaskSubscriber = new ClrSystemHandler<ICompletedTask>(); + _runningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); + _suspendedTaskSubscriber = new ClrSystemHandler<ISuspendedTask>(); + _closedContextSubscriber = new ClrSystemHandler<IClosedContext>(); + _failedContextSubscriber = new ClrSystemHandler<IFailedContext>(); + _contextMessageSubscriber = new ClrSystemHandler<IContextMessage>(); + _driverRestartSubscriber = new ClrSystemHandler<StartTime>(); + _driverRestartActiveContextSubscriber = new ClrSystemHandler<IActiveContext>(); + _driverRestartRunningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); + } + + public ulong[] Subscribe() + { + ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray(); + + // subscribe to StartTime event for driver restart + _driverRestartSubscriber.Subscribe(_driverRestartHandler); + _logger.Log(Level.Info, "subscribed to Driver restart handler: " + _driverRestartHandler); + handlers[Constants.Handlers[Constants.DriverRestartHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartSubscriber); + + // subscribe to Evaluator Requestor + foreach (var handler in _evaluatorRequestHandlers) + { + _evaluatorRequestorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IEvaluatorRequestor handler: " + handler); + } + handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorSubscriber); + + // subscribe to Allocated Evaluator + foreach (var handler in _allocatedEvaluatorHandlers) + { + _allocatedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IAllocatedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber); + + // subscribe to TaskMessage + foreach (var handler in _taskMessageHandlers) + { + _taskMessageSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ITaskMessage handler: " + handler); + } + handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber); + + // subscribe to Active Context + foreach (var handler in _activeContextHandlers) + { + _activeContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IActiveContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber); + + // subscribe to Failed Task + foreach (var handler in _failedTaskHandlers) + { + _failedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber); + + // subscribe to Running Task + foreach (var handler in _runningTaskHandlers) + { + _runningTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IRunningask handler: " + handler); + } + handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber); + + // subscribe to Completed Task + foreach (var handler in _completedTaskHandlers) + { + _completedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ICompletedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber); + + // subscribe to Suspended Task + foreach (var handler in _suspendedTaskHandlers) + { + _suspendedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ISuspendedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber); + + // subscribe to Failed Evaluator + foreach (var handler in _failedEvaluatorHandlers) + { + _failedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber); + + // subscribe to Completed Evaluator + foreach (var handler in _completedEvaluatorHandlers) + { + _completedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ICompletedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber); + + // subscribe to Closed Context + foreach (var handler in _closedContextHandlers) + { + _closedContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IClosedContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber); + + // subscribe to Failed Context + foreach (var handler in _failedContextHandlers) + { + _failedContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber); + + // subscribe to Context Message + foreach (var handler in _contextMessageHandlers) + { + _contextMessageSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IContextMesage handler: " + handler); + } + handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber); + + // subscribe to Active Context received during driver restart + foreach (var handler in _driverRestartActiveContextHandlers) + { + _driverRestartActiveContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to handler for IActiveContext received during driver restart: " + handler); + } + handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber); + + // subscribe to Running Task received during driver restart + foreach (var handler in _driverRestartRunningTaskHandlers) + { + _driverRestartRunningTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to handler for IRunningTask received during driver restart: " + handler); + } + handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber); + + // subscribe to Http message + _httpServerEventSubscriber.Subscribe(_httpServerHandler); + _logger.Log(Level.Info, "subscribed to IHttpMessage handler :" + _httpServerHandler); + handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); + + return handlers; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs new file mode 100644 index 0000000..7b7b280 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using Org.Apache.REEF.Wake.Time.Event; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class DriverBridgeConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// The event handler invoked right after the driver boots up. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredImpl<IStartHandler> OnDriverStarted = new RequiredImpl<IStartHandler>(); + + /// <summary> + /// The event handler invoked when driver restarts + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<StartTime>> OnDriverRestarted = new OptionalImpl<IObserver<StartTime>>(); + + /// <summary> + /// The event handler for requesting evaluator + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IEvaluatorRequestor>> OnEvaluatorRequested = new OptionalImpl<IObserver<IEvaluatorRequestor>>(); + + /// <summary> + /// Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IAllocatedEvaluator>> OnEvaluatorAllocated = new OptionalImpl<IObserver<IAllocatedEvaluator>>(); + + /// <summary> + /// Event handler for completed evaluators. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICompletedEvaluator>> OnEvaluatorCompleted = new OptionalImpl<IObserver<ICompletedEvaluator>>(); + + /// <summary> + /// Event handler for failed evaluators. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedEvaluator>> OnEvaluatorFailed = new OptionalImpl<IObserver<IFailedEvaluator>>(); + + /// <summary> + /// Event handler for failed evaluators. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IHttpHandler> OnHttpEvent = new OptionalImpl<IHttpHandler>(); + + /// <summary> + /// Event handler for task messages. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskMessage>> OnTaskMessage = new OptionalImpl<IObserver<ITaskMessage>>(); + + /// <summary> + /// Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICompletedTask>> OnTaskCompleted = new OptionalImpl<IObserver<ICompletedTask>>(); + + /// <summary> + /// Event handler for failed tasks. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedTask>> OnTaskFailed = new OptionalImpl<IObserver<IFailedTask>>(); + + ///// <summary> + ///// Event handler for running tasks. Defaults to logging if not bound. + ///// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IRunningTask>> OnTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); + + ///// <summary> + ///// Event handler for running task received during driver restart. Defaults to logging if not bound. + ///// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IRunningTask>> OnDriverRestartTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); + + /// <summary> + /// Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support + /// task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ISuspendedTask>> OnTaskSuspended = new OptionalImpl<IObserver<ISuspendedTask>>(); + + /// <summary> + /// Event handler for active context. Defaults to closing the context if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IActiveContext>> OnContextActive = new OptionalImpl<IObserver<IActiveContext>>(); + + /// <summary> + /// Event handler for active context received during driver restart. Defaults to closing the context if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IActiveContext>> OnDirverRestartContextActive = new OptionalImpl<IObserver<IActiveContext>>(); + + /// <summary> + /// Event handler for closed context. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IClosedContext>> OnContextClosed = new OptionalImpl<IObserver<IClosedContext>>(); + + /// <summary> + /// Event handler for closed context. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedContext>> OnContextFailed = new OptionalImpl<IObserver<IFailedContext>>(); + + /// <summary> + /// Event handler for context messages. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IContextMessage>> OnContextMessage = new OptionalImpl<IObserver<IContextMessage>>(); + + /// <summary> + /// Additional set of string arguments that can be pssed to handlers through client + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> CommandLineArguments = new OptionalParameter<string>(); + + /// <summary> + /// The trace level of the TraceListner + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> CustomTraceLevel = new OptionalParameter<string>(); + + /// <summary> + /// Additional set of trace listners provided by client + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<TraceListener> CustomTraceListeners = new OptionalParameter<TraceListener>(); + + /// <summary> + /// The implemenation for (attempting to) re-establish connection to driver + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IDriverConnection> OnDriverReconnect = new OptionalImpl<IDriverConnection>(); + + // This is currently not needed in Bridge/Driver model + ///// <summary> + ///// The event handler invoked right before the driver shuts down. Defaults to ignore. + ///// </summary> + //public static readonly OptionalImpl<IObserver<StopTime>> OnDriverStop = new OptionalImpl<IObserver<StopTime>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for client messages. Defaults to logging if not bound. + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientMessage = new OptionalImpl<IObserver<byte[]>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. + ///// Note: in java the type is void, but IObserver does not take void as a type + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosed = new OptionalImpl<IObserver<byte[]>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosedMessage = new OptionalImpl<IObserver<byte[]>>(); + + public static ConfigurationModule ConfigurationModule + { + get + { + return new DriverBridgeConfiguration() + .BindImplementation(GenericType<IStartHandler>.Class, OnDriverStarted) + .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartHandler>.Class, OnDriverRestarted) + .BindImplementation(GenericType<IDriverConnection>.Class, OnDriverReconnect) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.EvaluatorRequestHandlers>.Class, OnEvaluatorRequested) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers>.Class, OnEvaluatorAllocated) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ActiveContextHandlers>.Class, OnContextActive) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TaskMessageHandlers>.Class, OnTaskMessage) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedTaskHandlers>.Class, OnTaskFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.RunningTaskHandlers>.Class, OnTaskRunning) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.SuspendedTaskHandlers>.Class, OnTaskSuspended) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedEvaluatorHandlers>.Class, OnEvaluatorFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers>.Class, OnEvaluatorCompleted) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedTaskHandlers>.Class, OnTaskCompleted) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ClosedContextHandlers>.Class, OnContextClosed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedContextHandlers>.Class, OnContextFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class, OnContextMessage) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ArgumentSets>.Class, CommandLineArguments) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.HttpEventHandlers>.Class, OnHttpEvent) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TraceListenersSet>.Class, CustomTraceListeners) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class, OnDirverRestartContextActive) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class, OnDriverRestartTaskRunning) + .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class, CustomTraceLevel) + .Build(); + } + } + } + + public class CommandLineArguments + { + [Inject] + public CommandLineArguments([Parameter(typeof(DriverBridgeConfigurationOptions.ArgumentSets))] ISet<string> arguments) + { + Arguments = arguments; + } + + public ISet<string> Arguments { get; set; } + } + + public class CustomTraceListeners + { + [Inject] + public CustomTraceListeners([Parameter(typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> listeners) + { + Listeners = listeners; + } + + public ISet<TraceListener> Listeners { get; set; } + } + + public class CustomTraceLevel + { + [Inject] + public CustomTraceLevel([Parameter(typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel) + { + Level level = Level.Verbose; + if (Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) + { + Logger.SetCustomLevel(level); + } + else + { + Console.WriteLine("Cannot parse trace level" + traceLevel); + } + TraceLevel = level; + } + + public Level TraceLevel { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs new file mode 100644 index 0000000..9bc2402 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using Org.Apache.REEF.Wake.Time.Event; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")] + +namespace Org.Apache.REEF.Driver.Bridge +{ + /// <summary> + /// Hosts all named parameters for Drivers, including bridge handlers. + /// </summary> + public class DriverBridgeConfigurationOptions + { + // Level.Verbose (since enum is not suppoted for TANG, we use a string here) + private const string _verboseLevel = "Verbose"; + + [NamedParameter(documentation: "Called when driver is restarted, after CLR bridge is set up.", defaultClasses: new[] { typeof(DefaultDriverRestartHandler) })] + public class DriverRestartHandler : Name<IObserver<StartTime>> + { + } + + [NamedParameter(documentation: "Called when evaluator is requested.", defaultClasses: new[] { typeof(DefaultEvaluatorRequestorHandler) })] + public class EvaluatorRequestHandlers : Name<ISet<IObserver<IEvaluatorRequestor>>> + { + } + + [NamedParameter(documentation: "Called when an exception occurs on a running evaluator.", defaultClasses: new[] { typeof(DefaultEvaluatorFailureHandler) })] + public class FailedEvaluatorHandlers : Name<ISet<IObserver<IFailedEvaluator>>> + { + } + + [NamedParameter(documentation: "Called when an evaluator completes.", defaultClasses: new[] { typeof(DefaultEvaluatorCompletionHandler) })] + public class CompletedEvaluatorHandlers : Name<ISet<IObserver<ICompletedEvaluator>>> + { + } + + [NamedParameter(documentation: "Called when an allocated evaluator is given to the client.", defaultClasses: new[] { typeof(DefaultEvaluatorAllocationHandler) })] + public class AllocatedEvaluatorHandlers : Name<ISet<IObserver<IAllocatedEvaluator>>> + { + } + + [NamedParameter(documentation: "Running task handler.", defaultClasses: new[] { typeof(DefaultTaskRunningHandler) })] + public class RunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> + { + } + + [NamedParameter(documentation: "Running task during driver restart handler.", defaultClasses: new[] { typeof(DefaultDriverRestartTaskRunningHandler) })] + public class DriverRestartRunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> + { + } + + [NamedParameter(documentation: "Task exception handler.", defaultClasses: new[] { typeof(DefaultTaskFailureHandler) })] + public class FailedTaskHandlers : Name<ISet<IObserver<IFailedTask>>> + { + } + + [NamedParameter(documentation: "Task message handler.", defaultClasses: new[] { typeof(DefaultTaskMessageHandler) })] + public class TaskMessageHandlers : Name<ISet<IObserver<ITaskMessage>>> + { + } + + [NamedParameter(documentation: "Http Event Handlers.", defaultClasses: new[] { typeof(DefaultHttpHandler) })] + public class HttpEventHandlers : Name<ISet<IHttpHandler>> + { + } + + [NamedParameter(documentation: "Completed task handler.", defaultClasses: new[] { typeof(DefaultTaskCompletionHandler) })] + public class CompletedTaskHandlers : Name<ISet<IObserver<ICompletedTask>>> + { + } + + [NamedParameter(documentation: "Suspended task handler.", defaultClasses: new[] { typeof(DefaultTaskSuspensionHandler) })] + public class SuspendedTaskHandlers : Name<ISet<IObserver<ISuspendedTask>>> + { + } + + [NamedParameter(documentation: "Handler for IActiveContext.", defaultClasses: new[] { typeof(DefaultContextActiveHandler) })] + public class ActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> + { + } + + [NamedParameter(documentation: "Handler for IActiveContext received during driver restart.", defaultClasses: new[] { typeof(DefaultDriverRestartContextActiveHandler) })] + public class DriverRestartActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> + { + } + + [NamedParameter(documentation: "Handler for ClosedContext.", defaultClasses: new[] { typeof(DefaultContextClosureHandler) })] + public class ClosedContextHandlers : Name<ISet<IObserver<IClosedContext>>> + { + } + + [NamedParameter(documentation: "Handler for FailedContext.", defaultClasses: new[] { typeof(DefaultContextFailureHandler) })] + public class FailedContextHandlers : Name<ISet<IObserver<IFailedContext>>> + { + } + + [NamedParameter(documentation: "Handler for ContextMessage.", defaultClasses: new[] { typeof(DefaultContextMessageHandler) })] + public class ContextMessageHandlers : Name<ISet<IObserver<IContextMessage>>> + { + } + + [NamedParameter("Command Line Arguments supplied by client", "CommandLineArguments", null)] + public class ArgumentSets : Name<ISet<string>> + { + } + + [NamedParameter("Additional trace listners supplied by client", "TraceListeners", null, defaultClasses: new[] { typeof(DefaultCustomTraceListener) })] + public class TraceListenersSet : Name<ISet<TraceListener>> + { + } + + [NamedParameter("Custom Trace Level", "TraceLevel", defaultValue: _verboseLevel)] + public class TraceLevel : Name<string> + { + } + + //[NamedParameter(documentation: "Job message handler.", defaultClasses: new[] { typeof(DefaultClientMessageHandler) })] + //public class ClientMessageHandlers : Name<ISet<IObserver<byte[]>>> + //{ + //} + + //[NamedParameter(documentation: "Client close handler.", defaultClasses: new[] { typeof(DefaultClientCloseHandler) })] + //public class ClientCloseHandlers : Name<ISet<IObserver<byte[]>>> + //{ + //} + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs new file mode 100644 index 0000000..989483b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class ActiveContext : IActiveContext + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ActiveContext)); + + private readonly AvroConfigurationSerializer _serializer; + + public ActiveContext(IActiveContextClr2Java clr2Java) + { + InstanceId = Guid.NewGuid().ToString("N"); + Clr2Java = clr2Java; + _serializer = new AvroConfigurationSerializer(); + } + + [DataMember] + public string InstanceId { get; set; } + + public string Id + { + get + { + return Clr2Java.GetId(); + } + + set + { + } + } + + public string EvaluatorId + { + get + { + return Clr2Java.GetEvaluatorId(); + } + + set + { + } + } + + public Optional<string> ParentId { get; set; } + + public IEvaluatorDescriptor EvaluatorDescriptor + { + get + { + return Clr2Java.GetEvaluatorDescriptor(); + } + + set + { + } + } + + private IActiveContextClr2Java Clr2Java { get; set; } + + public void SubmitTask(IConfiguration taskConfiguration) + { + LOGGER.Log(Level.Info, "ActiveContext::SubmitTask"); + string task = _serializer.ToString(taskConfiguration); + LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task); + Clr2Java.SubmitTask(task); + } + + public void Dispose() + { + LOGGER.Log(Level.Info, "ActiveContext::Dispose"); + Clr2Java.Close(); + } + + public void SubmitContext(IConfiguration contextConfiguration) + { + throw new NotImplementedException(); + } + + public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) + { + throw new NotImplementedException(); + } + + public void SendMessage(byte[] message) + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs new file mode 100644 index 0000000..7db4be8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Catalog; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class AllocatedEvaluator : IAllocatedEvaluator + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(AllocatedEvaluator)); + + private readonly AvroConfigurationSerializer _serializer; + + private IEvaluatorDescriptor _evaluatorDescriptor; + + public AllocatedEvaluator(IAllocatedEvaluaotrClr2Java clr2Java) + { + InstanceId = Guid.NewGuid().ToString("N"); + _serializer = new AvroConfigurationSerializer(); + Clr2Java = clr2Java; + Id = Clr2Java.GetId(); + ProcessNewEvaluator(); + + NameServerInfo = Clr2Java.GetNameServerInfo(); + } + + [DataMember] + public string InstanceId { get; set; } + + public string Id { get; set; } + + public string EvaluatorBatchId { get; set; } + + public EvaluatorType Type { get; set; } + + public string NameServerInfo { get; set; } + + [DataMember] + private IAllocatedEvaluaotrClr2Java Clr2Java { get; set; } + + public void SubmitContext(IConfiguration contextConfiguration) + { + LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContext"); + string context = _serializer.ToString(contextConfiguration); + LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context); + Clr2Java.SubmitContext(context); + } + + public void SubmitContextAndTask(IConfiguration contextConfiguration, IConfiguration taskConfiguration) + { + LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndTask"); + + string context = _serializer.ToString(contextConfiguration); + string task = _serializer.ToString(taskConfiguration); + + LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context); + LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task); + + Clr2Java.SubmitContextAndTask(context, task); + } + + public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) + { + LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndService"); + + string context = _serializer.ToString(contextConfiguration); + string service = _serializer.ToString(serviceConfiguration); + + LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context); + LOGGER.Log(Level.Info, "serialized serviceConfiguration: " + service); + + Clr2Java.SubmitContextAndService(context, service); + } + + public void SubmitContextAndServiceAndTask(IConfiguration contextConfiguration, IConfiguration serviceConfiguration, IConfiguration taskConfiguration) + { + LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndServiceAndTask"); + + string context = _serializer.ToString(contextConfiguration); + string service = _serializer.ToString(serviceConfiguration); + string task = _serializer.ToString(taskConfiguration); + + LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context); + LOGGER.Log(Level.Info, "serialized serviceConfiguration: " + service); + LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task); + + Clr2Java.SubmitContextAndServiceAndTask(context, service, task); + } + + public IEvaluatorDescriptor GetEvaluatorDescriptor() + { + return _evaluatorDescriptor; + } + + public void Dispose() + { + Clr2Java.Close(); + } + + public INodeDescriptor GetNodeDescriptor() + { + throw new NotImplementedException(); + } + + public void AddFile(string file) + { + throw new NotImplementedException(); + } + + public void AddLibrary(string file) + { + throw new NotImplementedException(); + } + + public void AddFileResource(string file) + { + throw new NotImplementedException(); + } + + private void ProcessNewEvaluator() + { + _evaluatorDescriptor = Clr2Java.GetEvaluatorDescriptor(); + lock (EvaluatorRequestor.Evaluators) + { + foreach (KeyValuePair<string, IEvaluatorDescriptor> pair in EvaluatorRequestor.Evaluators) + { + if (pair.Value.Equals(_evaluatorDescriptor)) + { + string key = pair.Key; + EvaluatorRequestor.Evaluators.Remove(key); + string assignedId = key.Substring(0, key.LastIndexOf('_')); + string message = string.Format( + CultureInfo.InvariantCulture, + "Received evalautor [{0}] of memory {1}MB that matches request of {2}MB with batch id [{3}].", + Id, + _evaluatorDescriptor.Memory, + pair.Value.Memory, + assignedId); + + LOGGER.Log(Level.Verbose, message); + EvaluatorBatchId = assignedId; + break; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs new file mode 100644 index 0000000..693d520 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + public class ClosedContext : IClosedContext + { + private string _id; + + private string _evaluatorId; + + public ClosedContext(IClosedContextClr2Java clr2java) + { + InstanceId = Guid.NewGuid().ToString("N"); + _id = clr2java.GetId(); + _evaluatorId = clr2java.GetEvaluatorId(); + } + + [DataMember] + public string InstanceId { get; set; } + + public string Id + { + get + { + return _id; + } + + set + { + } + } + + public string EvaluatorId + { + get + { + return _evaluatorId; + } + + set + { + } + } + + public Optional<string> ParentId { get; set; } + + public IEvaluatorDescriptor EvaluatorDescriptor + { + get + { + return ClosedContextClr2JavaClr2Java.GetEvaluatorDescriptor(); + } + + set + { + } + } + + public IActiveContext ParentContext + { + get + { + return new ActiveContext(ParentContextClr2Java); + } + + set + { + } + } + + private IActiveContextClr2Java ParentContextClr2Java { get; set; } + + private IClosedContextClr2Java ClosedContextClr2JavaClr2Java { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs new file mode 100644 index 0000000..096599a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Evaluator; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class CompletedEvaluator : ICompletedEvaluator + { + private string _instanceId; + + public CompletedEvaluator(ICompletedEvaluatorClr2Java clr2Java) + { + _instanceId = Guid.NewGuid().ToString("N"); + CompletedEvaluatorClr2Java = clr2Java; + } + + [DataMember] + public string InstanceId + { + get { return _instanceId; } + set { _instanceId = value; } + } + + [DataMember] + public string Id + { + get + { + return CompletedEvaluatorClr2Java.GetId(); + } + + set + { + } + } + + [DataMember] + public ICompletedEvaluatorClr2Java CompletedEvaluatorClr2Java { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs new file mode 100644 index 0000000..3fb76b2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class CompletedTask : ICompletedTask + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(CompletedTask)); + + internal CompletedTask(ICompletedTaskClr2Java completedTaskClr2Java) + { + InstanceId = Guid.NewGuid().ToString("N"); + CompletedTaskClr2Java = completedTaskClr2Java; + ActiveContextClr2Java = completedTaskClr2Java.GetActiveContext(); + } + + [DataMember] + public string InstanceId { get; set; } + + public byte[] Message { get; set; } + + public string Id + { + get + { + return CompletedTaskClr2Java.GetId(); + } + + set + { + } + } + + public IActiveContext ActiveContext + { + get + { + return new ActiveContext(ActiveContextClr2Java); + } + + set + { + } + } + + [DataMember] + private ICompletedTaskClr2Java CompletedTaskClr2Java { get; set; } + + [DataMember] + private IActiveContextClr2Java ActiveContextClr2Java { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs new file mode 100644 index 0000000..afebc59 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Bridge.Clr2java; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + public class ContextMessage : IContextMessage + { + private readonly string _messageSourcId; + private readonly byte[] _bytes; + private readonly string _id; + + public ContextMessage(IContextMessageClr2Java clr2Java) + { + _messageSourcId = clr2Java.GetMessageSourceId(); + _bytes = clr2Java.Get(); + _id = clr2Java.GetId(); + } + + public string Id + { + get { return _id; } + set { } + } + + public string MessageSourceId + { + get { return _messageSourcId; } + } + + public byte[] Message + { + get { return _bytes; } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs new file mode 100644 index 0000000..0be5c9b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Catalog; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class EvaluatorRequestor : IEvaluatorRequestor + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRequestor)); + + private static Dictionary<string, IEvaluatorDescriptor> _evaluators; + + public EvaluatorRequestor(IEvaluatorRequestorClr2Java clr2Java) + { + InstanceId = Guid.NewGuid().ToString("N"); + Clr2Java = clr2Java; + } + + public static Dictionary<string, IEvaluatorDescriptor> Evaluators + { + get + { + if (_evaluators == null) + { + _evaluators = new Dictionary<string, IEvaluatorDescriptor>(); + } + return _evaluators; + } + } + + public IResourceCatalog ResourceCatalog { get; set; } + + [DataMember] + public string InstanceId { get; set; } + + [DataMember] + private IEvaluatorRequestorClr2Java Clr2Java { get; set; } + + public void Submit(IEvaluatorRequest request) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack)); + + lock (Evaluators) + { + for (int i = 0; i < request.Number; i++) + { + EvaluatorDescriptorImpl descriptor = new EvaluatorDescriptorImpl(new NodeDescriptorImpl(), EvaluatorType.CLR, request.MemoryMegaBytes, request.VirtualCore); + descriptor.Rack = request.Rack; + string key = string.Format(CultureInfo.InvariantCulture, "{0}_{1}", request.EvaluatorBatchId, i); + try + { + _evaluators.Add(key, descriptor); + } + catch (ArgumentException e) + { + Exceptions.Caught(e, Level.Error, string.Format(CultureInfo.InvariantCulture, "EvaluatorBatchId [{0}] already exists.", key), LOGGER); + Exceptions.Throw(new InvalidOperationException("Cannot use evaluator id " + key, e), LOGGER); + } + } + } + + Clr2Java.Submit(request); + } + + public void Dispose() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs new file mode 100644 index 0000000..eb982c6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + public class FailedContext : IFailedContext + { + private string _id; + + private string _evaluatorId; + + private string _parentId; + + public FailedContext(IFailedContextClr2Java clr2Java) + { + _id = clr2Java.GetId(); + _evaluatorId = clr2Java.GetEvaluatorId(); + _parentId = clr2Java.GetParentId(); + FailedContextClr2Java = clr2Java; + } + + public string Id + { + get + { + return _id; + } + + set + { + } + } + + public string EvaluatorId + { + get + { + return _evaluatorId; + } + + set + { + } + } + + public Optional<string> ParentId + { + get + { + return string.IsNullOrEmpty(_parentId) ? + Optional<string>.Empty() : + Optional<string>.Of(_parentId); + } + + set + { + } + } + + public IEvaluatorDescriptor EvaluatorDescriptor + { + get + { + return FailedContextClr2Java.GetEvaluatorDescriptor(); + } + + set + { + } + } + + public Optional<IActiveContext> ParentContext + { + get + { + IActiveContextClr2Java context = FailedContextClr2Java.GetParentContext(); + if (context != null) + { + return Optional<IActiveContext>.Of(new ActiveContext(context)); + } + else + { + return Optional<IActiveContext>.Empty(); + } + } + } + + private IFailedContextClr2Java FailedContextClr2Java { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs new file mode 100644 index 0000000..a63f953 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Exceptions; +using Org.Apache.REEF.Driver.Bridge.Clr2java; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Driver.Bridge.Events +{ + [DataContract] + internal class FailedEvaluator : IFailedEvaluator + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(FailedEvaluator)); + + public FailedEvaluator(IFailedEvaluatorClr2Java clr2Java) + { + InstanceId = Guid.NewGuid().ToString("N"); + FailedEvaluatorClr2Java = clr2Java; + EvaluatorRequestorClr2Java = FailedEvaluatorClr2Java.GetEvaluatorRequestor(); + Id = FailedEvaluatorClr2Java.GetId(); + } + + [DataMember] + public string InstanceId { get; set; } + + public string Id { get; set; } + + public EvaluatorException EvaluatorException { get; set; } + + public List<FailedContext> FailedContexts { get; set; } + + public Optional<IFailedTask> FailedTask { get; set; } + + [DataMember] + private IFailedEvaluatorClr2Java FailedEvaluatorClr2Java { get; set; } + + [DataMember] + private IEvaluatorRequestorClr2Java EvaluatorRequestorClr2Java { get; set; } + + public IEvaluatorRequestor GetEvaluatorRequetor() + { + if (EvaluatorRequestorClr2Java == null) + { + Exceptions.Throw(new InvalidOperationException("EvaluatorRequestorClr2Java not initialized."), LOGGER); + } + return new EvaluatorRequestor(EvaluatorRequestorClr2Java); + } + } +}
