http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs new file mode 100644 index 0000000..03f385a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Driver.bridge +{ + public class ClrClientHelper + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrClientHelper)); + + public static void Run(HashSet<string> appDlls, IConfiguration driverBridgeConfig, DriverSubmissionSettings driverSubmissionSettings, string reefJar = Constants.BridgeJarFileName, string runCommand = "run.cmd", string clrFolder = ".", string className = Constants.BridgeLaunchClass) + { + using (LOGGER.LogFunction("ClrHandlerHelper::Run")) + { + if (driverSubmissionSettings.Submit) + { + ClrHandlerHelper.CopyDllsToAppDirectory(appDlls); + UpdateJarFileWithAssemblies(reefJar); + } + + using (LOGGER.LogScope("ClrHandlerHelper::serialize driverBridgeConfig to clrRuntimeConfigFile")) + { + string clrRuntimeConfigFile = Path.Combine(clrFolder, Constants.DriverBridgeConfiguration); + new AvroConfigurationSerializer().ToFile(driverBridgeConfig, clrRuntimeConfigFile); + LOGGER.Log(Level.Info, "CLR driver bridge configurations written to " + clrRuntimeConfigFile); + } + + ProcessStartInfo startInfo = new ProcessStartInfo(); + if (driverSubmissionSettings.RunOnYarn) + { + startInfo.FileName = runCommand; + startInfo.Arguments = className + " " + clrFolder + + driverSubmissionSettings.ToComamndLineArguments(); + } + else + { + startInfo.FileName = GetJavaBinary(); + string loggingPrefix = string.Empty; + if (driverSubmissionSettings.JavaLogLevel == JavaLoggingSetting.VERBOSE_TO_CLR) + { + loggingPrefix = Constants.JavaToCLRLoggingConfig + " "; + } + else if (driverSubmissionSettings.JavaLogLevel == JavaLoggingSetting.VERBOSE) + { + loggingPrefix = Constants.JavaVerboseLoggingConfig + " "; + } + startInfo.Arguments = loggingPrefix + @"-classpath " + reefJar + " " + Constants.BridgeLaunchClass + + " " + clrFolder + " " + driverSubmissionSettings.ToComamndLineArguments(); + } + startInfo.RedirectStandardOutput = true; + startInfo.UseShellExecute = false; + startInfo.CreateNoWindow = false; + LOGGER.Log(Level.Info, "Executing\r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); + using (Process process = Process.Start(startInfo)) + { + process.WaitForExit(); + } + } + } + + public static void UpdateJarFileWithAssemblies(string reefJar) + { + using (LOGGER.LogFunction("ClrHandlerHelper::UpdateJarFileWithAssemblies")) + { + string assembliesList = ClrHandlerHelper.GetAssembliesListForReefDriverApp(); + if (!File.Exists(reefJar)) + { + throw new InvalidOperationException("cannot find reef jar file: " + reefJar); + } + ProcessStartInfo startInfo = new ProcessStartInfo() + { + FileName = GetJarBinary(), + Arguments = @"uf " + reefJar + " " + assembliesList, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; + + LOGGER.Log(Level.Info, "updating jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); + using (Process process = Process.Start(startInfo)) + { + StreamReader outReader = process.StandardOutput; + StreamReader errorReader = process.StandardError; + string output = outReader.ReadToEnd(); + string error = errorReader.ReadToEnd(); + process.WaitForExit(); + if (process.ExitCode != 0) + { + throw new InvalidOperationException("Failed to update jar file with stdout :" + output + + "and stderr:" + error); + } + } + LOGGER.Log(Level.Info, "jar file updated."); + } + } + + public static void ExtractConfigfileFromJar(string reefJar, IList<string> configFiles, string dropFolder) + { + var configFileNames = string.Join(" ", configFiles.ToArray()); + ProcessStartInfo startInfo = new ProcessStartInfo() + { + FileName = GetJarBinary(), + Arguments = @"xf " + reefJar + " " + configFileNames, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; + + LOGGER.Log(Level.Info, "extracting files from jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); + using (Process process = Process.Start(startInfo)) + { + StreamReader outReader = process.StandardOutput; + StreamReader errorReader = process.StandardError; + string output = outReader.ReadToEnd(); + string error = errorReader.ReadToEnd(); + process.WaitForExit(); + if (process.ExitCode != 0) + { + throw new InvalidOperationException("Failed to extract files from jar file with stdout :" + output + + "and stderr:" + error); + } + } + LOGGER.Log(Level.Info, "files are extracted."); + } + + private static string GetJarBinary() + { + string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); + if (string.IsNullOrWhiteSpace(javaHome)) + { + LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); + Environment.Exit(1); + } + return Path.Combine(javaHome, "bin", "jar.exe"); + } + + private static string GetJavaBinary() + { + string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); + if (string.IsNullOrWhiteSpace(javaHome)) + { + LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); + Environment.Exit(1); + } + return Path.Combine(javaHome, "bin", "java.exe"); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs new file mode 100644 index 0000000..c6dd02f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Protobuf; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Runtime.InteropServices; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class ClrHandlerHelper + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrHandlerHelper)); + + public static string[] ReefAssemblies + { + get + { + return new[] { "Microsoft.Hadoop.Avro.dll", "Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", "Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.IO.Network.dll", "Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", "Newtonsoft.Json.dll", "protobuf-net.dll" }; + } + } + + internal static int MemoryGranularity { get; set; } + + public static ulong CreateHandler(object handler) + { + GCHandle gc = GCHandle.Alloc(handler); + IntPtr intPtr = GCHandle.ToIntPtr(gc); + ulong ul = (ulong)intPtr.ToInt64(); + return ul; + } + + public static void FreeHandle(ulong handle) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + gc.Free(); + } + + public static void SetMemoryGranuality(int granularity) + { + if (granularity <= 0) + { + var e = new ArgumentException("granularity must be a positive value, provided: " + granularity); + Exceptions.Throw(e, LOGGER); + } + MemoryGranularity = granularity; + } + + public static ulong CreateNullHandler() + { + return Constants.NullHandler; + } + + public static ISet<string> GetCommandLineArguments() + { + using (LOGGER.LogFunction("ClrHandlerHelper::GetCommandLineArguments")) + { + string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", + Constants.DriverBridgeConfiguration); + + if (!File.Exists(bridgeConfiguration)) + { + string error = "Configuraiton file not found: " + bridgeConfiguration; + LOGGER.Log(Level.Error, error); + Exceptions.Throw(new InvalidOperationException(error), LOGGER); + } + CommandLineArguments arguments; + IInjector injector; + try + { + IConfiguration driverBridgeConfiguration = + new AvroConfigurationSerializer().FromFile(bridgeConfiguration); + injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration); + arguments = injector.GetInstance<CommandLineArguments>(); + } + catch (InjectionException e) + { + string error = "Cannot inject command line arguments from driver bridge configuration. "; + Exceptions.Caught(e, Level.Error, error, LOGGER); + // return empty string set + return new HashSet<string>(); + } + return arguments.Arguments; + } + } + + public static void SupplyAdditionalClassPath(params string[] classPaths) + { + string path = Path.Combine(Directory.GetCurrentDirectory(), Constants.GlobalUserSuppliedJavaLibraries); + File.Delete(path); + File.WriteAllText(path, string.Join(",", classPaths)); + } + + public static void GenerateClassHierarchy(HashSet<string> clrDlls) + { + using (LOGGER.LogFunction("ClrHandlerHelper::GenerateClassHierarchy")) + { + IClassHierarchy ns = TangFactory.GetTang().GetClassHierarchy(clrDlls.ToArray()); + ProtocolBufferClassHierarchy.Serialize(Constants.ClassHierarachyBin, ns); + + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Class hierarchy written to [{0}].", Path.Combine(Directory.GetCurrentDirectory(), Constants.ClassHierarachyBin))); + } + } + + public static string GetAssembliesListForReefDriverApp() + { + using (LOGGER.LogFunction("ClrHandlerHelper::GetAssembliesListForReefDriverApp")) + { + string executionDirectory = Directory.GetCurrentDirectory(); + IList<string> assemblies = + Directory.GetFiles(Path.Combine(executionDirectory, Constants.DriverAppDirectory), "*.dll") + .Select(f => string.Format(CultureInfo.InvariantCulture, "\"{0}\"", Constants.DriverAppDirectory + @"\" + Path.GetFileName(f))).ToList(); + + foreach (string reefAssembly in ReefAssemblies) + { + if (!File.Exists(reefAssembly)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Assembly [{0}] needed for REEF driver not found in {1}", reefAssembly, executionDirectory)); + Exceptions.Throw(e, LOGGER); + } + File.Copy(reefAssembly, Path.Combine(executionDirectory, Constants.DriverAppDirectory, reefAssembly), overwrite: true); + assemblies.Add(string.Format(CultureInfo.InvariantCulture, "\"{0}\"", Constants.DriverAppDirectory + @"\" + reefAssembly)); + } + return string.Join(" ", assemblies); + } + } + + public static void CopyDllsToAppDirectory(HashSet<string> dlls) + { + using (LOGGER.LogFunction("ClrHandlerHelper::CopyDllsToAppDirectory")) + { + string executionDirectory = Directory.GetCurrentDirectory(); + Directory.CreateDirectory(Path.Combine(executionDirectory, Constants.DriverAppDirectory)); + foreach (string dll in dlls) + { + string dllFile = dll; + if (!dll.EndsWith(".dll", StringComparison.OrdinalIgnoreCase)) + { + dllFile += ".dll"; + } + if (!File.Exists(dllFile)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Assembly [{0}] for REEF application not found in {1}", dllFile, executionDirectory)); + Exceptions.Throw(e, LOGGER); + } + File.Copy(dllFile, Path.Combine(executionDirectory, Constants.DriverAppDirectory, dllFile), overwrite: true); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs new file mode 100644 index 0000000..15b957d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class ClrSystemHandler<T> : IObserver<T>, IObservable<T> + { + List<IObserver<T>> userHandlers = new List<IObserver<T>>(); + + public void OnNext(T value) + { + foreach (var observer in userHandlers) + { + observer.OnNext(value); + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public IDisposable Subscribe(IObserver<T> observer) + { + userHandlers.Add(observer); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs new file mode 100644 index 0000000..58deabc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using System.IO; +using System.Runtime.InteropServices; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class ClrSystemHandlerWrapper + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrSystemHandlerWrapper)); + + private static DriverBridge _driverBridge; + + public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluaotrClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target; + obj.OnNext(new AllocatedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemActiveContextHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; + obj.OnNext(new ActiveContext(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestartActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartActiveContextHandler_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; + obj.OnNext(new ActiveContext(clr2Java)); + } + } + + public static void Call_ClrSystemEvaluatorRequestor_OnNext(ulong handle, IEvaluatorRequestorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemEvaluatorRequestor_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IEvaluatorRequestor> obj = (ClrSystemHandler<IEvaluatorRequestor>)gc.Target; + obj.OnNext(new EvaluatorRequestor(clr2Java)); + } + } + + public static void Call_ClrSystemTaskMessage_OnNext(ulong handle, ITaskMessageClr2Java clr2Java, byte[] message) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemTaskMessage_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ITaskMessage> obj = (ClrSystemHandler<ITaskMessage>)gc.Target; + obj.OnNext(new TaskMessage(clr2Java, message)); + } + } + + public static void Call_ClrSystemFailedTask_OnNext(ulong handle, IFailedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedTask> obj = (ClrSystemHandler<IFailedTask>)gc.Target; + obj.OnNext(new FailedTask(clr2Java)); + } + } + + public static void Call_ClrSystemRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRunningTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; + obj.OnNext(new RunningTask(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestartRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartRunningTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; + obj.OnNext(new RunningTask(clr2Java)); + } + } + + public static void Call_ClrSystemFailedEvaluator_OnNext(ulong handle, IFailedEvaluatorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedEvaluator_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedEvaluator> obj = (ClrSystemHandler<IFailedEvaluator>)gc.Target; + obj.OnNext(new FailedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemCompletedTask_OnNext(ulong handle, ICompletedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ICompletedTask> obj = (ClrSystemHandler<ICompletedTask>)gc.Target; + obj.OnNext(new CompletedTask(clr2Java)); + } + } + + public static void Call_ClrSystemSuspendedTask_OnNext(ulong handle, ISuspendedTaskClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemSuspendedTask_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ISuspendedTask> obj = (ClrSystemHandler<ISuspendedTask>)gc.Target; + obj.OnNext(new SuspendedTask(clr2Java)); + } + } + + public static void Call_ClrSystemCompletedEvaluator_OnNext(ulong handle, ICompletedEvaluatorClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedEvaluator_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<ICompletedEvaluator> obj = (ClrSystemHandler<ICompletedEvaluator>)gc.Target; + obj.OnNext(new CompletedEvaluator(clr2Java)); + } + } + + public static void Call_ClrSystemHttpServer_OnNext(ulong handle, IHttpServerBridgeClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemHttpServer_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IHttpMessage> obj = (ClrSystemHandler<IHttpMessage>)gc.Target; + obj.OnNext(new HttpMessage(clr2Java)); + } + } + + public static void Call_ClrSystemClosedContext_OnNext(ulong handle, IClosedContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemClosedContext_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IClosedContext> obj = (ClrSystemHandler<IClosedContext>)gc.Target; + obj.OnNext(new ClosedContext(clr2Java)); + } + } + + public static void Call_ClrSystemFailedContext_OnNext(ulong handle, IFailedContextClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedContext_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IFailedContext> obj = (ClrSystemHandler<IFailedContext>)gc.Target; + obj.OnNext(new FailedContext(clr2Java)); + } + } + + public static void Call_ClrSystemContextMessage_OnNext(ulong handle, IContextMessageClr2Java clr2Java) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemContextMessage_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<IContextMessage> obj = (ClrSystemHandler<IContextMessage>)gc.Target; + obj.OnNext(new ContextMessage(clr2Java)); + } + } + + public static void Call_ClrSystemDriverRestart_OnNext(ulong handle) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestart_OnNext")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + ClrSystemHandler<StartTime> obj = (ClrSystemHandler<StartTime>)gc.Target; + obj.OnNext(new StartTime(DateTime.Now.Ticks)); + } + } + + //Deprecate, remove after both Java and C# code gets checked in + public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) + { + LOGGER.Log(Level.Info, "*** Start time is " + startTime); + return GetHandlers(null); + } + } + + public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime, string httpServerPort) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) + { + LOGGER.Log(Level.Info, "*** Start time is " + startTime); + LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); + return GetHandlers(httpServerPort); + } + } + + private static ulong[] GetHandlers(string httpServerPortNumber) + { + IStartHandler startHandler; + IInjector injector = null; + string errorMessage; + string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", Constants.DriverBridgeConfiguration); + if (!File.Exists(bridgeConfiguration)) + { + errorMessage = "Cannot find CLR Driver bridge configuration file " + bridgeConfiguration; + Exceptions.Throw(new InvalidOperationException(errorMessage), LOGGER); + } + try + { + IConfiguration driverBridgeConfiguration = new AvroConfigurationSerializer().FromFile(bridgeConfiguration); + injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration); + } + catch (Exception e) + { + errorMessage = "Failed to get injector from driver bridge configuration."; + Exceptions.CaughtAndThrow(new InvalidOperationException(errorMessage, e), Level.Error, errorMessage, LOGGER); + } + + try + { + HttpServerPort port = injector.GetInstance<HttpServerPort>(); + port.PortNumber = httpServerPortNumber == null ? 0 : int.Parse(httpServerPortNumber, CultureInfo.InvariantCulture); + + startHandler = injector.GetInstance<IStartHandler>(); + LOGGER.Log(Level.Info, "Start handler set to be " + startHandler.Identifier); + _driverBridge = injector.GetInstance<DriverBridge>(); + } + catch (Exception e) + { + Exceptions.CaughtAndThrow(e, Level.Error, "Cannot get instance.", LOGGER); + } + + return _driverBridge.Subscribe(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs new file mode 100644 index 0000000..314aa6a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; + +using Org.Apache.REEF.Wake.Time; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class DriverBridge + { + private static Logger _logger; + + private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorSubscriber; + + private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorSubscriber; + + private static ClrSystemHandler<ITaskMessage> _taskMessageSubscriber; + + private static ClrSystemHandler<IActiveContext> _activeContextSubscriber; + + private static ClrSystemHandler<IActiveContext> _driverRestartActiveContextSubscriber; + + private static ClrSystemHandler<IFailedTask> _failedTaskSubscriber; + + private static ClrSystemHandler<IRunningTask> _runningTaskSubscriber; + + private static ClrSystemHandler<IRunningTask> _driverRestartRunningTaskSubscriber; + + private static ClrSystemHandler<ISuspendedTask> _suspendedTaskSubscriber; + + private static ClrSystemHandler<IFailedEvaluator> _failedEvaluatorSubscriber; + + private static ClrSystemHandler<ICompletedEvaluator> _completedEvaluatorSubscriber; + + private static ClrSystemHandler<IHttpMessage> _httpServerEventSubscriber; + + private static ClrSystemHandler<ICompletedTask> _completedTaskSubscriber; + + private static ClrSystemHandler<IClosedContext> _closedContextSubscriber; + + private static ClrSystemHandler<IFailedContext> _failedContextSubscriber; + + private static ClrSystemHandler<IContextMessage> _contextMessageSubscriber; + + private static ClrSystemHandler<StartTime> _driverRestartSubscriber; + + private IObserver<StartTime> _driverRestartHandler; + + private ISet<IObserver<IEvaluatorRequestor>> _evaluatorRequestHandlers; + + private ISet<IObserver<IAllocatedEvaluator>> _allocatedEvaluatorHandlers; + + private ISet<IObserver<IActiveContext>> _activeContextHandlers; + + private ISet<IObserver<IActiveContext>> _driverRestartActiveContextHandlers; + + private ISet<IObserver<ITaskMessage>> _taskMessageHandlers; + + private ISet<IObserver<IFailedTask>> _failedTaskHandlers; + + private ISet<IObserver<ISuspendedTask>> _suspendedTaskHandlers; + + private ISet<IObserver<IRunningTask>> _runningTaskHandlers; + + private ISet<IObserver<IRunningTask>> _driverRestartRunningTaskHandlers; + + private ISet<IObserver<IFailedEvaluator>> _failedEvaluatorHandlers; + + private ISet<IObserver<ICompletedEvaluator>> _completedEvaluatorHandlers; + + private ISet<IObserver<IClosedContext>> _closedContextHandlers; + + private ISet<IObserver<IFailedContext>> _failedContextHandlers; + + private ISet<IObserver<IContextMessage>> _contextMessageHandlers; + + private ISet<IObserver<ICompletedTask>> _completedTaskHandlers; + + private HttpServerHandler _httpServerHandler; + + [Inject] + public DriverBridge( + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartHandler))] IObserver<StartTime> driverRestartHandler, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.EvaluatorRequestHandlers))] ISet<IObserver<IEvaluatorRequestor>> evaluatorRequestHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers))] ISet<IObserver<IAllocatedEvaluator>> allocatedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ActiveContextHandlers))] ISet<IObserver<IActiveContext>> activeContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TaskMessageHandlers))] ISet<IObserver<ITaskMessage>> taskMessageHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedTaskHandlers))] ISet<IObserver<IFailedTask>> failedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedEvaluatorHandlers))] ISet<IObserver<IFailedEvaluator>> failedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers))] ISet<IObserver<ICompletedEvaluator>> completedEvaluatorHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.RunningTaskHandlers))] ISet<IObserver<IRunningTask>> runningTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedTaskHandlers))] ISet<IObserver<ICompletedTask>> completedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.SuspendedTaskHandlers))] ISet<IObserver<ISuspendedTask>> suspendedTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ClosedContextHandlers))] ISet<IObserver<IClosedContext>> closedContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedContextHandlers))] ISet<IObserver<IFailedContext>> failedContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ContextMessageHandlers))] ISet<IObserver<IContextMessage>> contextMessageHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))] ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))] ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> traceListeners, + [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel, + HttpServerHandler httpServerHandler) + { + foreach (TraceListener listener in traceListeners) + { + Logger.AddTraceListner(listener); + } + _logger = Logger.GetLogger(typeof(DriverBridge)); + _logger.Log(Level.Info, "Constructing DriverBridge"); + + Level level; + if (!Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) + { + _logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Invalid trace level {0} provided, will by default use verbose level", traceLevel)); + } + else + { + Logger.SetCustomLevel(level); + } + + _evaluatorRequestHandlers = evaluatorRequestHandlers; + _allocatedEvaluatorHandlers = allocatedEvaluatorHandlers; + _activeContextHandlers = activeContextHandlers; + _taskMessageHandlers = taskMessageHandlers; + _failedEvaluatorHandlers = failedEvaluatorHandlers; + _failedTaskHandlers = failedTaskHandlers; + _completedTaskHandlers = completedTaskHandlers; + _runningTaskHandlers = runningTaskHandlers; + _suspendedTaskHandlers = suspendedTaskHandlers; + _completedEvaluatorHandlers = completedEvaluatorHandlers; + _closedContextHandlers = closedContextHandlers; + _failedContextHandlers = failedContextHandlers; + _contextMessageHandlers = contextMessageHandlers; + _driverRestartHandler = driverRestartHandler; + _driverRestartActiveContextHandlers = driverRestartActiveContextHandlers; + _driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers; + _httpServerHandler = httpServerHandler; + + _evaluatorRequestorSubscriber = new ClrSystemHandler<IEvaluatorRequestor>(); + _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>(); + _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>(); + _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>(); + _activeContextSubscriber = new ClrSystemHandler<IActiveContext>(); + _failedTaskSubscriber = new ClrSystemHandler<IFailedTask>(); + _failedEvaluatorSubscriber = new ClrSystemHandler<IFailedEvaluator>(); + _httpServerEventSubscriber = new ClrSystemHandler<IHttpMessage>(); + _completedTaskSubscriber = new ClrSystemHandler<ICompletedTask>(); + _runningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); + _suspendedTaskSubscriber = new ClrSystemHandler<ISuspendedTask>(); + _closedContextSubscriber = new ClrSystemHandler<IClosedContext>(); + _failedContextSubscriber = new ClrSystemHandler<IFailedContext>(); + _contextMessageSubscriber = new ClrSystemHandler<IContextMessage>(); + _driverRestartSubscriber = new ClrSystemHandler<StartTime>(); + _driverRestartActiveContextSubscriber = new ClrSystemHandler<IActiveContext>(); + _driverRestartRunningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); + } + + public ulong[] Subscribe() + { + ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray(); + + // subscribe to StartTime event for driver restart + _driverRestartSubscriber.Subscribe(_driverRestartHandler); + _logger.Log(Level.Info, "subscribed to Driver restart handler: " + _driverRestartHandler); + handlers[Constants.Handlers[Constants.DriverRestartHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartSubscriber); + + // subscribe to Evaluator Requestor + foreach (var handler in _evaluatorRequestHandlers) + { + _evaluatorRequestorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IEvaluatorRequestor handler: " + handler); + } + handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorSubscriber); + + // subscribe to Allocated Evaluator + foreach (var handler in _allocatedEvaluatorHandlers) + { + _allocatedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IAllocatedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber); + + // subscribe to TaskMessage + foreach (var handler in _taskMessageHandlers) + { + _taskMessageSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ITaskMessage handler: " + handler); + } + handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber); + + // subscribe to Active Context + foreach (var handler in _activeContextHandlers) + { + _activeContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IActiveContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber); + + // subscribe to Failed Task + foreach (var handler in _failedTaskHandlers) + { + _failedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber); + + // subscribe to Running Task + foreach (var handler in _runningTaskHandlers) + { + _runningTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IRunningask handler: " + handler); + } + handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber); + + // subscribe to Completed Task + foreach (var handler in _completedTaskHandlers) + { + _completedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ICompletedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber); + + // subscribe to Suspended Task + foreach (var handler in _suspendedTaskHandlers) + { + _suspendedTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ISuspendedTask handler: " + handler); + } + handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber); + + // subscribe to Failed Evaluator + foreach (var handler in _failedEvaluatorHandlers) + { + _failedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber); + + // subscribe to Completed Evaluator + foreach (var handler in _completedEvaluatorHandlers) + { + _completedEvaluatorSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to ICompletedEvaluator handler: " + handler); + } + handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber); + + // subscribe to Closed Context + foreach (var handler in _closedContextHandlers) + { + _closedContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IClosedContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber); + + // subscribe to Failed Context + foreach (var handler in _failedContextHandlers) + { + _failedContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IFailedContext handler: " + handler); + } + handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber); + + // subscribe to Context Message + foreach (var handler in _contextMessageHandlers) + { + _contextMessageSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to IContextMesage handler: " + handler); + } + handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber); + + // subscribe to Active Context received during driver restart + foreach (var handler in _driverRestartActiveContextHandlers) + { + _driverRestartActiveContextSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to handler for IActiveContext received during driver restart: " + handler); + } + handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber); + + // subscribe to Running Task received during driver restart + foreach (var handler in _driverRestartRunningTaskHandlers) + { + _driverRestartRunningTaskSubscriber.Subscribe(handler); + _logger.Log(Level.Info, "subscribed to handler for IRunningTask received during driver restart: " + handler); + } + handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber); + + // subscribe to Http message + _httpServerEventSubscriber.Subscribe(_httpServerHandler); + _logger.Log(Level.Info, "subscribed to IHttpMessage handler :" + _httpServerHandler); + handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); + + return handlers; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs new file mode 100644 index 0000000..047ba5d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] + +namespace Org.Apache.REEF.Driver.Bridge +{ + public class DriverBridgeConfiguration : ConfigurationModuleBuilder + { + /// <summary> + /// The event handler invoked right after the driver boots up. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredImpl<IStartHandler> OnDriverStarted = new RequiredImpl<IStartHandler>(); + + /// <summary> + /// The event handler invoked when driver restarts + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<StartTime>> OnDriverRestarted = new OptionalImpl<IObserver<StartTime>>(); + + /// <summary> + /// The event handler for requesting evaluator + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IEvaluatorRequestor>> OnEvaluatorRequested = new OptionalImpl<IObserver<IEvaluatorRequestor>>(); + + /// <summary> + /// Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IAllocatedEvaluator>> OnEvaluatorAllocated = new OptionalImpl<IObserver<IAllocatedEvaluator>>(); + + /// <summary> + /// Event handler for completed evaluators. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICompletedEvaluator>> OnEvaluatorCompleted = new OptionalImpl<IObserver<ICompletedEvaluator>>(); + + /// <summary> + /// Event handler for failed evaluators. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedEvaluator>> OnEvaluatorFailed = new OptionalImpl<IObserver<IFailedEvaluator>>(); + + /// <summary> + /// Event handler for failed evaluators. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IHttpHandler> OnHttpEvent = new OptionalImpl<IHttpHandler>(); + + /// <summary> + /// Event handler for task messages. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskMessage>> OnTaskMessage = new OptionalImpl<IObserver<ITaskMessage>>(); + + /// <summary> + /// Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ICompletedTask>> OnTaskCompleted = new OptionalImpl<IObserver<ICompletedTask>>(); + + /// <summary> + /// Event handler for failed tasks. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedTask>> OnTaskFailed = new OptionalImpl<IObserver<IFailedTask>>(); + + ///// <summary> + ///// Event handler for running tasks. Defaults to logging if not bound. + ///// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IRunningTask>> OnTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); + + ///// <summary> + ///// Event handler for running task received during driver restart. Defaults to logging if not bound. + ///// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IRunningTask>> OnDriverRestartTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); + + /// <summary> + /// Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support + /// task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ISuspendedTask>> OnTaskSuspended = new OptionalImpl<IObserver<ISuspendedTask>>(); + + /// <summary> + /// Event handler for active context. Defaults to closing the context if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IActiveContext>> OnContextActive = new OptionalImpl<IObserver<IActiveContext>>(); + + /// <summary> + /// Event handler for active context received during driver restart. Defaults to closing the context if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IActiveContext>> OnDirverRestartContextActive = new OptionalImpl<IObserver<IActiveContext>>(); + + /// <summary> + /// Event handler for closed context. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IClosedContext>> OnContextClosed = new OptionalImpl<IObserver<IClosedContext>>(); + + /// <summary> + /// Event handler for closed context. Defaults to job failure if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IFailedContext>> OnContextFailed = new OptionalImpl<IObserver<IFailedContext>>(); + + /// <summary> + /// Event handler for context messages. Defaults to logging if not bound. + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IContextMessage>> OnContextMessage = new OptionalImpl<IObserver<IContextMessage>>(); + + /// <summary> + /// Additional set of string arguments that can be pssed to handlers through client + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> CommandLineArguments = new OptionalParameter<string>(); + + /// <summary> + /// The trace level of the TraceListner + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<string> CustomTraceLevel = new OptionalParameter<string>(); + + /// <summary> + /// Additional set of trace listners provided by client + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalParameter<TraceListener> CustomTraceListeners = new OptionalParameter<TraceListener>(); + + /// <summary> + /// The implemenation for (attempting to) re-establish connection to driver + /// </summary> + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IDriverConnection> OnDriverReconnect = new OptionalImpl<IDriverConnection>(); + + // This is currently not needed in Bridge/Driver model + ///// <summary> + ///// The event handler invoked right before the driver shuts down. Defaults to ignore. + ///// </summary> + //public static readonly OptionalImpl<IObserver<StopTime>> OnDriverStop = new OptionalImpl<IObserver<StopTime>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for client messages. Defaults to logging if not bound. + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientMessage = new OptionalImpl<IObserver<byte[]>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. + ///// Note: in java the type is void, but IObserver does not take void as a type + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosed = new OptionalImpl<IObserver<byte[]>>(); + + // Client handlers only needed when client interactions are expeceted. Not enabled for now. + ///// <summary> + ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. + ///// </summary> + //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosedMessage = new OptionalImpl<IObserver<byte[]>>(); + + public static ConfigurationModule ConfigurationModule + { + get + { + return new DriverBridgeConfiguration() + .BindImplementation(GenericType<IStartHandler>.Class, OnDriverStarted) + .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartHandler>.Class, OnDriverRestarted) + .BindImplementation(GenericType<IDriverConnection>.Class, OnDriverReconnect) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.EvaluatorRequestHandlers>.Class, OnEvaluatorRequested) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers>.Class, OnEvaluatorAllocated) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ActiveContextHandlers>.Class, OnContextActive) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TaskMessageHandlers>.Class, OnTaskMessage) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedTaskHandlers>.Class, OnTaskFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.RunningTaskHandlers>.Class, OnTaskRunning) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.SuspendedTaskHandlers>.Class, OnTaskSuspended) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedEvaluatorHandlers>.Class, OnEvaluatorFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers>.Class, OnEvaluatorCompleted) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedTaskHandlers>.Class, OnTaskCompleted) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ClosedContextHandlers>.Class, OnContextClosed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedContextHandlers>.Class, OnContextFailed) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class, OnContextMessage) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ArgumentSets>.Class, CommandLineArguments) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.HttpEventHandlers>.Class, OnHttpEvent) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TraceListenersSet>.Class, CustomTraceListeners) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class, OnDirverRestartContextActive) + .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class, OnDriverRestartTaskRunning) + .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class, CustomTraceLevel) + .Build(); + } + } + } + + public class CommandLineArguments + { + [Inject] + public CommandLineArguments([Parameter(typeof(DriverBridgeConfigurationOptions.ArgumentSets))] ISet<string> arguments) + { + Arguments = arguments; + } + + public ISet<string> Arguments { get; set; } + } + + public class CustomTraceListeners + { + [Inject] + public CustomTraceListeners([Parameter(typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> listeners) + { + Listeners = listeners; + } + + public ISet<TraceListener> Listeners { get; set; } + } + + public class CustomTraceLevel + { + [Inject] + public CustomTraceLevel([Parameter(typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel) + { + Level level = Level.Verbose; + if (Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) + { + Logger.SetCustomLevel(level); + } + else + { + Console.WriteLine("Cannot parse trace level" + traceLevel); + } + TraceLevel = level; + } + + public Level TraceLevel { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs new file mode 100644 index 0000000..c2f4bfd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; + +[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")] + +namespace Org.Apache.REEF.Driver.Bridge +{ + /// <summary> + /// Hosts all named parameters for Drivers, including bridge handlers. + /// </summary> + public class DriverBridgeConfigurationOptions + { + // Level.Verbose (since enum is not suppoted for TANG, we use a string here) + private const string _verboseLevel = "Verbose"; + + [NamedParameter(documentation: "Called when driver is restarted, after CLR bridge is set up.", defaultClasses: new[] { typeof(DefaultDriverRestartHandler) })] + public class DriverRestartHandler : Name<IObserver<StartTime>> + { + } + + [NamedParameter(documentation: "Called when evaluator is requested.", defaultClasses: new[] { typeof(DefaultEvaluatorRequestorHandler) })] + public class EvaluatorRequestHandlers : Name<ISet<IObserver<IEvaluatorRequestor>>> + { + } + + [NamedParameter(documentation: "Called when an exception occurs on a running evaluator.", defaultClasses: new[] { typeof(DefaultEvaluatorFailureHandler) })] + public class FailedEvaluatorHandlers : Name<ISet<IObserver<IFailedEvaluator>>> + { + } + + [NamedParameter(documentation: "Called when an evaluator completes.", defaultClasses: new[] { typeof(DefaultEvaluatorCompletionHandler) })] + public class CompletedEvaluatorHandlers : Name<ISet<IObserver<ICompletedEvaluator>>> + { + } + + [NamedParameter(documentation: "Called when an allocated evaluator is given to the client.", defaultClasses: new[] { typeof(DefaultEvaluatorAllocationHandler) })] + public class AllocatedEvaluatorHandlers : Name<ISet<IObserver<IAllocatedEvaluator>>> + { + } + + [NamedParameter(documentation: "Running task handler.", defaultClasses: new[] { typeof(DefaultTaskRunningHandler) })] + public class RunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> + { + } + + [NamedParameter(documentation: "Running task during driver restart handler.", defaultClasses: new[] { typeof(DefaultDriverRestartTaskRunningHandler) })] + public class DriverRestartRunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> + { + } + + [NamedParameter(documentation: "Task exception handler.", defaultClasses: new[] { typeof(DefaultTaskFailureHandler) })] + public class FailedTaskHandlers : Name<ISet<IObserver<IFailedTask>>> + { + } + + [NamedParameter(documentation: "Task message handler.", defaultClasses: new[] { typeof(DefaultTaskMessageHandler) })] + public class TaskMessageHandlers : Name<ISet<IObserver<ITaskMessage>>> + { + } + + [NamedParameter(documentation: "Http Event Handlers.", defaultClasses: new[] { typeof(DefaultHttpHandler) })] + public class HttpEventHandlers : Name<ISet<IHttpHandler>> + { + } + + [NamedParameter(documentation: "Completed task handler.", defaultClasses: new[] { typeof(DefaultTaskCompletionHandler) })] + public class CompletedTaskHandlers : Name<ISet<IObserver<ICompletedTask>>> + { + } + + [NamedParameter(documentation: "Suspended task handler.", defaultClasses: new[] { typeof(DefaultTaskSuspensionHandler) })] + public class SuspendedTaskHandlers : Name<ISet<IObserver<ISuspendedTask>>> + { + } + + [NamedParameter(documentation: "Handler for IActiveContext.", defaultClasses: new[] { typeof(DefaultContextActiveHandler) })] + public class ActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> + { + } + + [NamedParameter(documentation: "Handler for IActiveContext received during driver restart.", defaultClasses: new[] { typeof(DefaultDriverRestartContextActiveHandler) })] + public class DriverRestartActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> + { + } + + [NamedParameter(documentation: "Handler for ClosedContext.", defaultClasses: new[] { typeof(DefaultContextClosureHandler) })] + public class ClosedContextHandlers : Name<ISet<IObserver<IClosedContext>>> + { + } + + [NamedParameter(documentation: "Handler for FailedContext.", defaultClasses: new[] { typeof(DefaultContextFailureHandler) })] + public class FailedContextHandlers : Name<ISet<IObserver<IFailedContext>>> + { + } + + [NamedParameter(documentation: "Handler for ContextMessage.", defaultClasses: new[] { typeof(DefaultContextMessageHandler) })] + public class ContextMessageHandlers : Name<ISet<IObserver<IContextMessage>>> + { + } + + [NamedParameter("Command Line Arguments supplied by client", "CommandLineArguments", null)] + public class ArgumentSets : Name<ISet<string>> + { + } + + [NamedParameter("Additional trace listners supplied by client", "TraceListeners", null, defaultClasses: new[] { typeof(DefaultCustomTraceListener) })] + public class TraceListenersSet : Name<ISet<TraceListener>> + { + } + + [NamedParameter("Custom Trace Level", "TraceLevel", defaultValue: _verboseLevel)] + public class TraceLevel : Name<string> + { + } + + //[NamedParameter(documentation: "Job message handler.", defaultClasses: new[] { typeof(DefaultClientMessageHandler) })] + //public class ClientMessageHandlers : Name<ISet<IObserver<byte[]>>> + //{ + //} + + //[NamedParameter(documentation: "Client close handler.", defaultClasses: new[] { typeof(DefaultClientCloseHandler) })] + //public class ClientCloseHandlers : Name<ISet<IObserver<byte[]>>> + //{ + //} + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs new file mode 100644 index 0000000..7aa1a33 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Driver.Bridge +{ + [DataContract] + public class HttpMessage : IHttpMessage + { + public HttpMessage(IHttpServerBridgeClr2Java httpServerBridgeClr2Java) + { + HttpServerBridgeClr2Java = httpServerBridgeClr2Java; + } + + [DataMember] + private IHttpServerBridgeClr2Java HttpServerBridgeClr2Java { get; set; } + + public string GetRequestString() + { + return HttpServerBridgeClr2Java.GetQueryString(); + } + + public void SetQueryResult(string responseString) + { + HttpServerBridgeClr2Java.SetQueryResult(responseString); + } + + public byte[] GetQueryReuestData() + { + return HttpServerBridgeClr2Java.GetQueryRequestData(); + } + + public void SetQueryResponseData(byte[] responseData) + { + HttpServerBridgeClr2Java.SetQueryResponseData(responseData); + } + + public void SetUriSpecification(string uriSpecification) + { + HttpServerBridgeClr2Java.SetUriSpecification(uriSpecification); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs new file mode 100644 index 0000000..c6691e7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Avro; +using Org.Apache.REEF.Driver.bridge; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Net; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Driver.Bridge +{ + /// <summary> + /// HttpServerHandler, the handler for all CLR http events + /// </summary> + public class HttpServerHandler : IObserver<IHttpMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(HttpServerHandler)); + + private static readonly string SPEC = "SPEC"; + + private IDictionary<string, IHttpHandler> eventHandlers = new Dictionary<string, IHttpHandler>(); + + private HttpServerPort httpServerPort; + + /// <summary> + /// Initializes a new instance of the <see cref="HttpServerHandler" /> class. + /// </summary> + /// <param name="httpEventHandlers">The HTTP event handlers.</param> + /// <param name="httpServerPort">The HTTP server port.</param> + [Inject] + public HttpServerHandler([Parameter(Value = typeof(DriverBridgeConfigurationOptions.HttpEventHandlers))] ISet<IHttpHandler> httpEventHandlers, + HttpServerPort httpServerPort) + { + LOGGER.Log(Level.Info, "Constructing HttpServerHandler"); + foreach (var h in httpEventHandlers) + { + string spec = h.GetSpecification(); + if (spec.Contains(":")) + { + Exceptions.Throw(new ArgumentException("spec cannot contain :"), "The http spec given is " + spec, LOGGER); + } + LOGGER.Log(Level.Info, "HttpHandler spec:" + spec); + eventHandlers.Add(spec.ToLower(CultureInfo.CurrentCulture), h); + } + this.httpServerPort = httpServerPort; + } + + /// <summary> + /// Called when receving an http request from Java side + /// </summary> + /// <param name="httpMessage">The HTTP message.</param> + public void OnNext(IHttpMessage httpMessage) + { + LOGGER.Log(Level.Info, "HttpHandler OnNext is called"); + string requestString = httpMessage.GetRequestString(); + + if (requestString != null && requestString.Equals(SPEC)) + { + LOGGER.Log(Level.Info, "HttpHandler OnNext, requestString:" + requestString); + LOGGER.Log(Level.Info, "HttpHandler OnNext, port number:" + httpServerPort.PortNumber); + + httpMessage.SetUriSpecification(GetAllSpecifications()); + } + else + { + LOGGER.Log(Level.Info, "HttpHandler OnNext, handling http request."); + byte[] byteData = httpMessage.GetQueryReuestData(); + AvroHttpRequest avroHttpRequest = AvroHttpSerializer.FromBytes(byteData); + LOGGER.Log(Level.Info, "HttpHandler OnNext, requestData:" + avroHttpRequest); + + string spec = GetSpecification(avroHttpRequest.PathInfo); + if (spec != null) + { + LOGGER.Log(Level.Info, "HttpHandler OnNext, target:" + spec); + ReefHttpRequest request = ToHttpRequest(avroHttpRequest); + ReefHttpResponse response = new ReefHttpResponse(); + + IHttpHandler handler; + eventHandlers.TryGetValue(spec.ToLower(CultureInfo.CurrentCulture), out handler); + + byte[] responseData; + if (handler != null) + { + LOGGER.Log(Level.Info, "HttpHandler OnNext, get eventHandler:" + handler.GetSpecification()); + handler.OnHttpRequest(request, response); + responseData = response.OutputStream; + } + else + { + responseData = + ByteUtilities.StringToByteArrays(string.Format(CultureInfo.CurrentCulture, + "No event handler found at CLR side for {0}.", + spec)); + } + httpMessage.SetQueryResponseData(responseData); + } + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + private string GetAllSpecifications() + { + return string.Join(":", eventHandlers.Keys.ToArray()); + } + + private string GetSpecification(string requestUri) + { + if (requestUri != null) + { + string[] parts = requestUri.Split('/'); + + if (parts.Length > 1) + { + return parts[1]; + } + } + return null; + } + + private ReefHttpRequest ToHttpRequest(AvroHttpRequest avroRequest) + { + ReefHttpRequest httpRequest = new ReefHttpRequest(); + httpRequest.PathInfo = avroRequest.PathInfo; + httpRequest.InputStream = avroRequest.InputStream; + httpRequest.Url = avroRequest.RequestUrl; + httpRequest.Querystring = avroRequest.QueryString; + + HttpMethod m; + HttpMethod.TryParse(avroRequest.HttpMethod, true, out m); + httpRequest.Method = m; + return httpRequest; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs new file mode 100644 index 0000000..50fb915 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Driver.bridge +{ + public class HttpServerPort + { + [Inject] + public HttpServerPort() + { + } + + public int PortNumber { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs new file mode 100644 index 0000000..409c974 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Driver.Bridge; + +namespace Org.Apache.REEF.Driver.Bridge +{ + public interface IHttpHandler + { + /// <summary> + /// Define the specification of the handler. ":" is not allowed in the specification. + /// </summary> + /// <returns>string specification</returns> + string GetSpecification(); + + /// <summary> + /// Called when Http request is sent + /// </summary> + /// <param name="requet">The requet.</param> + /// <param name="resonse">The resonse.</param> + void OnHttpRequest(ReefHttpRequest requet, ReefHttpResponse resonse); + } +}
