http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs deleted file mode 100644 index 03f385a..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrClientHelper.cs +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Linq; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Interface; - -namespace Org.Apache.REEF.Driver.bridge -{ - public class ClrClientHelper - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrClientHelper)); - - public static void Run(HashSet<string> appDlls, IConfiguration driverBridgeConfig, DriverSubmissionSettings driverSubmissionSettings, string reefJar = Constants.BridgeJarFileName, string runCommand = "run.cmd", string clrFolder = ".", string className = Constants.BridgeLaunchClass) - { - using (LOGGER.LogFunction("ClrHandlerHelper::Run")) - { - if (driverSubmissionSettings.Submit) - { - ClrHandlerHelper.CopyDllsToAppDirectory(appDlls); - UpdateJarFileWithAssemblies(reefJar); - } - - using (LOGGER.LogScope("ClrHandlerHelper::serialize driverBridgeConfig to clrRuntimeConfigFile")) - { - string clrRuntimeConfigFile = Path.Combine(clrFolder, Constants.DriverBridgeConfiguration); - new AvroConfigurationSerializer().ToFile(driverBridgeConfig, clrRuntimeConfigFile); - LOGGER.Log(Level.Info, "CLR driver bridge configurations written to " + clrRuntimeConfigFile); - } - - ProcessStartInfo startInfo = new ProcessStartInfo(); - if (driverSubmissionSettings.RunOnYarn) - { - startInfo.FileName = runCommand; - startInfo.Arguments = className + " " + clrFolder + - driverSubmissionSettings.ToComamndLineArguments(); - } - else - { - startInfo.FileName = GetJavaBinary(); - string loggingPrefix = string.Empty; - if (driverSubmissionSettings.JavaLogLevel == JavaLoggingSetting.VERBOSE_TO_CLR) - { - loggingPrefix = Constants.JavaToCLRLoggingConfig + " "; - } - else if (driverSubmissionSettings.JavaLogLevel == JavaLoggingSetting.VERBOSE) - { - loggingPrefix = Constants.JavaVerboseLoggingConfig + " "; - } - startInfo.Arguments = loggingPrefix + @"-classpath " + reefJar + " " + Constants.BridgeLaunchClass + - " " + clrFolder + " " + driverSubmissionSettings.ToComamndLineArguments(); - } - startInfo.RedirectStandardOutput = true; - startInfo.UseShellExecute = false; - startInfo.CreateNoWindow = false; - LOGGER.Log(Level.Info, "Executing\r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) - { - process.WaitForExit(); - } - } - } - - public static void UpdateJarFileWithAssemblies(string reefJar) - { - using (LOGGER.LogFunction("ClrHandlerHelper::UpdateJarFileWithAssemblies")) - { - string assembliesList = ClrHandlerHelper.GetAssembliesListForReefDriverApp(); - if (!File.Exists(reefJar)) - { - throw new InvalidOperationException("cannot find reef jar file: " + reefJar); - } - ProcessStartInfo startInfo = new ProcessStartInfo() - { - FileName = GetJarBinary(), - Arguments = @"uf " + reefJar + " " + assembliesList, - RedirectStandardOutput = true, - RedirectStandardError = true, - UseShellExecute = false, - CreateNoWindow = true - }; - - LOGGER.Log(Level.Info, "updating jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) - { - StreamReader outReader = process.StandardOutput; - StreamReader errorReader = process.StandardError; - string output = outReader.ReadToEnd(); - string error = errorReader.ReadToEnd(); - process.WaitForExit(); - if (process.ExitCode != 0) - { - throw new InvalidOperationException("Failed to update jar file with stdout :" + output + - "and stderr:" + error); - } - } - LOGGER.Log(Level.Info, "jar file updated."); - } - } - - public static void ExtractConfigfileFromJar(string reefJar, IList<string> configFiles, string dropFolder) - { - var configFileNames = string.Join(" ", configFiles.ToArray()); - ProcessStartInfo startInfo = new ProcessStartInfo() - { - FileName = GetJarBinary(), - Arguments = @"xf " + reefJar + " " + configFileNames, - RedirectStandardOutput = true, - RedirectStandardError = true, - UseShellExecute = false, - CreateNoWindow = true - }; - - LOGGER.Log(Level.Info, "extracting files from jar file with \r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); - using (Process process = Process.Start(startInfo)) - { - StreamReader outReader = process.StandardOutput; - StreamReader errorReader = process.StandardError; - string output = outReader.ReadToEnd(); - string error = errorReader.ReadToEnd(); - process.WaitForExit(); - if (process.ExitCode != 0) - { - throw new InvalidOperationException("Failed to extract files from jar file with stdout :" + output + - "and stderr:" + error); - } - } - LOGGER.Log(Level.Info, "files are extracted."); - } - - private static string GetJarBinary() - { - string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); - if (string.IsNullOrWhiteSpace(javaHome)) - { - LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); - Environment.Exit(1); - } - return Path.Combine(javaHome, "bin", "jar.exe"); - } - - private static string GetJavaBinary() - { - string javaHome = Environment.GetEnvironmentVariable("JAVA_HOME"); - if (string.IsNullOrWhiteSpace(javaHome)) - { - LOGGER.Log(Level.Info, "JAVA_HOME not set. Please set JAVA_HOME environment variable first. Exiting..."); - Environment.Exit(1); - } - return Path.Combine(javaHome, "bin", "java.exe"); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs deleted file mode 100644 index 8d9020c..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrHandlerHelper.cs +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Protobuf; -using System; -using System.Collections; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using System.Linq; -using System.Runtime.InteropServices; -using Org.Apache.REEF.Tang.Implementations.Tang; - -namespace Org.Apache.REEF.Driver.Bridge -{ - public class ClrHandlerHelper - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrHandlerHelper)); - - public static string[] ReefAssemblies - { - get - { - return new[] { "Microsoft.Hadoop.Avro.dll", "Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", "Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.Network.dll", "Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", "Newtonsoft.Json.dll", "protobuf-net.dll" }; - } - } - - internal static int MemoryGranularity { get; set; } - - public static ulong CreateHandler(object handler) - { - GCHandle gc = GCHandle.Alloc(handler); - IntPtr intPtr = GCHandle.ToIntPtr(gc); - ulong ul = (ulong)intPtr.ToInt64(); - return ul; - } - - public static void FreeHandle(ulong handle) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - gc.Free(); - } - - public static void SetMemoryGranuality(int granularity) - { - if (granularity <= 0) - { - var e = new ArgumentException("granularity must be a positive value, provided: " + granularity); - Exceptions.Throw(e, LOGGER); - } - MemoryGranularity = granularity; - } - - public static ulong CreateNullHandler() - { - return Constants.NullHandler; - } - - public static ISet<string> GetCommandLineArguments() - { - using (LOGGER.LogFunction("ClrHandlerHelper::GetCommandLineArguments")) - { - string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", - Constants.DriverBridgeConfiguration); - - if (!File.Exists(bridgeConfiguration)) - { - string error = "Configuraiton file not found: " + bridgeConfiguration; - LOGGER.Log(Level.Error, error); - Exceptions.Throw(new InvalidOperationException(error), LOGGER); - } - CommandLineArguments arguments; - IInjector injector; - try - { - IConfiguration driverBridgeConfiguration = - new AvroConfigurationSerializer().FromFile(bridgeConfiguration); - injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration); - arguments = injector.GetInstance<CommandLineArguments>(); - } - catch (InjectionException e) - { - string error = "Cannot inject command line arguments from driver bridge configuration. "; - Exceptions.Caught(e, Level.Error, error, LOGGER); - // return empty string set - return new HashSet<string>(); - } - return arguments.Arguments; - } - } - - public static void SupplyAdditionalClassPath(params string[] classPaths) - { - string path = Path.Combine(Directory.GetCurrentDirectory(), Constants.GlobalUserSuppliedJavaLibraries); - File.Delete(path); - File.WriteAllText(path, string.Join(",", classPaths)); - } - - public static void GenerateClassHierarchy(HashSet<string> clrDlls) - { - using (LOGGER.LogFunction("ClrHandlerHelper::GenerateClassHierarchy")) - { - IClassHierarchy ns = TangFactory.GetTang().GetClassHierarchy(clrDlls.ToArray()); - ProtocolBufferClassHierarchy.Serialize(Constants.ClassHierarachyBin, ns); - - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Class hierarchy written to [{0}].", Path.Combine(Directory.GetCurrentDirectory(), Constants.ClassHierarachyBin))); - } - } - - public static string GetAssembliesListForReefDriverApp() - { - using (LOGGER.LogFunction("ClrHandlerHelper::GetAssembliesListForReefDriverApp")) - { - string executionDirectory = Directory.GetCurrentDirectory(); - IList<string> assemblies = - Directory.GetFiles(Path.Combine(executionDirectory, Constants.DriverAppDirectory), "*.dll") - .Select(f => string.Format(CultureInfo.InvariantCulture, "\"{0}\"", Constants.DriverAppDirectory + @"\" + Path.GetFileName(f))).ToList(); - - foreach (string reefAssembly in ReefAssemblies) - { - if (!File.Exists(reefAssembly)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Assembly [{0}] needed for REEF driver not found in {1}", reefAssembly, executionDirectory)); - Exceptions.Throw(e, LOGGER); - } - File.Copy(reefAssembly, Path.Combine(executionDirectory, Constants.DriverAppDirectory, reefAssembly), overwrite: true); - assemblies.Add(string.Format(CultureInfo.InvariantCulture, "\"{0}\"", Constants.DriverAppDirectory + @"\" + reefAssembly)); - } - return string.Join(" ", assemblies); - } - } - - public static void CopyDllsToAppDirectory(HashSet<string> dlls) - { - using (LOGGER.LogFunction("ClrHandlerHelper::CopyDllsToAppDirectory")) - { - string executionDirectory = Directory.GetCurrentDirectory(); - Directory.CreateDirectory(Path.Combine(executionDirectory, Constants.DriverAppDirectory)); - foreach (string dll in dlls) - { - string dllFile = dll; - if (!dll.EndsWith(".dll", StringComparison.OrdinalIgnoreCase)) - { - dllFile += ".dll"; - } - if (!File.Exists(dllFile)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Assembly [{0}] for REEF application not found in {1}", dllFile, executionDirectory)); - Exceptions.Throw(e, LOGGER); - } - File.Copy(dllFile, Path.Combine(executionDirectory, Constants.DriverAppDirectory, dllFile), overwrite: true); - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs deleted file mode 100644 index 15b957d..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandler.cs +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; - -namespace Org.Apache.REEF.Driver.Bridge -{ - public class ClrSystemHandler<T> : IObserver<T>, IObservable<T> - { - List<IObserver<T>> userHandlers = new List<IObserver<T>>(); - - public void OnNext(T value) - { - foreach (var observer in userHandlers) - { - observer.OnNext(value); - } - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public IDisposable Subscribe(IObserver<T> observer) - { - userHandlers.Add(observer); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs deleted file mode 100644 index 58deabc..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/ClrSystemHandlerWrapper.cs +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using System.IO; -using System.Runtime.InteropServices; -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Driver.bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake.Time; -using Org.Apache.REEF.Tang.Implementations.Tang; - -namespace Org.Apache.REEF.Driver.Bridge -{ - public class ClrSystemHandlerWrapper - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrSystemHandlerWrapper)); - - private static DriverBridge _driverBridge; - - public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluaotrClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target; - obj.OnNext(new AllocatedEvaluator(clr2Java)); - } - } - - public static void Call_ClrSystemActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemActiveContextHandler_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; - obj.OnNext(new ActiveContext(clr2Java)); - } - } - - public static void Call_ClrSystemDriverRestartActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartActiveContextHandler_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target; - obj.OnNext(new ActiveContext(clr2Java)); - } - } - - public static void Call_ClrSystemEvaluatorRequestor_OnNext(ulong handle, IEvaluatorRequestorClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemEvaluatorRequestor_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IEvaluatorRequestor> obj = (ClrSystemHandler<IEvaluatorRequestor>)gc.Target; - obj.OnNext(new EvaluatorRequestor(clr2Java)); - } - } - - public static void Call_ClrSystemTaskMessage_OnNext(ulong handle, ITaskMessageClr2Java clr2Java, byte[] message) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemTaskMessage_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<ITaskMessage> obj = (ClrSystemHandler<ITaskMessage>)gc.Target; - obj.OnNext(new TaskMessage(clr2Java, message)); - } - } - - public static void Call_ClrSystemFailedTask_OnNext(ulong handle, IFailedTaskClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedTask_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IFailedTask> obj = (ClrSystemHandler<IFailedTask>)gc.Target; - obj.OnNext(new FailedTask(clr2Java)); - } - } - - public static void Call_ClrSystemRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRunningTask_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; - obj.OnNext(new RunningTask(clr2Java)); - } - } - - public static void Call_ClrSystemDriverRestartRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartRunningTask_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target; - obj.OnNext(new RunningTask(clr2Java)); - } - } - - public static void Call_ClrSystemFailedEvaluator_OnNext(ulong handle, IFailedEvaluatorClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedEvaluator_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IFailedEvaluator> obj = (ClrSystemHandler<IFailedEvaluator>)gc.Target; - obj.OnNext(new FailedEvaluator(clr2Java)); - } - } - - public static void Call_ClrSystemCompletedTask_OnNext(ulong handle, ICompletedTaskClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedTask_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<ICompletedTask> obj = (ClrSystemHandler<ICompletedTask>)gc.Target; - obj.OnNext(new CompletedTask(clr2Java)); - } - } - - public static void Call_ClrSystemSuspendedTask_OnNext(ulong handle, ISuspendedTaskClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemSuspendedTask_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<ISuspendedTask> obj = (ClrSystemHandler<ISuspendedTask>)gc.Target; - obj.OnNext(new SuspendedTask(clr2Java)); - } - } - - public static void Call_ClrSystemCompletedEvaluator_OnNext(ulong handle, ICompletedEvaluatorClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedEvaluator_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<ICompletedEvaluator> obj = (ClrSystemHandler<ICompletedEvaluator>)gc.Target; - obj.OnNext(new CompletedEvaluator(clr2Java)); - } - } - - public static void Call_ClrSystemHttpServer_OnNext(ulong handle, IHttpServerBridgeClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemHttpServer_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IHttpMessage> obj = (ClrSystemHandler<IHttpMessage>)gc.Target; - obj.OnNext(new HttpMessage(clr2Java)); - } - } - - public static void Call_ClrSystemClosedContext_OnNext(ulong handle, IClosedContextClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemClosedContext_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IClosedContext> obj = (ClrSystemHandler<IClosedContext>)gc.Target; - obj.OnNext(new ClosedContext(clr2Java)); - } - } - - public static void Call_ClrSystemFailedContext_OnNext(ulong handle, IFailedContextClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedContext_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IFailedContext> obj = (ClrSystemHandler<IFailedContext>)gc.Target; - obj.OnNext(new FailedContext(clr2Java)); - } - } - - public static void Call_ClrSystemContextMessage_OnNext(ulong handle, IContextMessageClr2Java clr2Java) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemContextMessage_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<IContextMessage> obj = (ClrSystemHandler<IContextMessage>)gc.Target; - obj.OnNext(new ContextMessage(clr2Java)); - } - } - - public static void Call_ClrSystemDriverRestart_OnNext(ulong handle) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestart_OnNext")) - { - GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); - ClrSystemHandler<StartTime> obj = (ClrSystemHandler<StartTime>)gc.Target; - obj.OnNext(new StartTime(DateTime.Now.Ticks)); - } - } - - //Deprecate, remove after both Java and C# code gets checked in - public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) - { - LOGGER.Log(Level.Info, "*** Start time is " + startTime); - return GetHandlers(null); - } - } - - public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime, string httpServerPort) - { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) - { - LOGGER.Log(Level.Info, "*** Start time is " + startTime); - LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort); - return GetHandlers(httpServerPort); - } - } - - private static ulong[] GetHandlers(string httpServerPortNumber) - { - IStartHandler startHandler; - IInjector injector = null; - string errorMessage; - string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", Constants.DriverBridgeConfiguration); - if (!File.Exists(bridgeConfiguration)) - { - errorMessage = "Cannot find CLR Driver bridge configuration file " + bridgeConfiguration; - Exceptions.Throw(new InvalidOperationException(errorMessage), LOGGER); - } - try - { - IConfiguration driverBridgeConfiguration = new AvroConfigurationSerializer().FromFile(bridgeConfiguration); - injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration); - } - catch (Exception e) - { - errorMessage = "Failed to get injector from driver bridge configuration."; - Exceptions.CaughtAndThrow(new InvalidOperationException(errorMessage, e), Level.Error, errorMessage, LOGGER); - } - - try - { - HttpServerPort port = injector.GetInstance<HttpServerPort>(); - port.PortNumber = httpServerPortNumber == null ? 0 : int.Parse(httpServerPortNumber, CultureInfo.InvariantCulture); - - startHandler = injector.GetInstance<IStartHandler>(); - LOGGER.Log(Level.Info, "Start handler set to be " + startHandler.Identifier); - _driverBridge = injector.GetInstance<DriverBridge>(); - } - catch (Exception e) - { - Exceptions.CaughtAndThrow(e, Level.Error, "Cannot get instance.", LOGGER); - } - - return _driverBridge.Subscribe(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs deleted file mode 100644 index 314aa6a..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridge.cs +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Globalization; -using System.Linq; - -using Org.Apache.REEF.Wake.Time; - -namespace Org.Apache.REEF.Driver.Bridge -{ - public class DriverBridge - { - private static Logger _logger; - - private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorSubscriber; - - private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorSubscriber; - - private static ClrSystemHandler<ITaskMessage> _taskMessageSubscriber; - - private static ClrSystemHandler<IActiveContext> _activeContextSubscriber; - - private static ClrSystemHandler<IActiveContext> _driverRestartActiveContextSubscriber; - - private static ClrSystemHandler<IFailedTask> _failedTaskSubscriber; - - private static ClrSystemHandler<IRunningTask> _runningTaskSubscriber; - - private static ClrSystemHandler<IRunningTask> _driverRestartRunningTaskSubscriber; - - private static ClrSystemHandler<ISuspendedTask> _suspendedTaskSubscriber; - - private static ClrSystemHandler<IFailedEvaluator> _failedEvaluatorSubscriber; - - private static ClrSystemHandler<ICompletedEvaluator> _completedEvaluatorSubscriber; - - private static ClrSystemHandler<IHttpMessage> _httpServerEventSubscriber; - - private static ClrSystemHandler<ICompletedTask> _completedTaskSubscriber; - - private static ClrSystemHandler<IClosedContext> _closedContextSubscriber; - - private static ClrSystemHandler<IFailedContext> _failedContextSubscriber; - - private static ClrSystemHandler<IContextMessage> _contextMessageSubscriber; - - private static ClrSystemHandler<StartTime> _driverRestartSubscriber; - - private IObserver<StartTime> _driverRestartHandler; - - private ISet<IObserver<IEvaluatorRequestor>> _evaluatorRequestHandlers; - - private ISet<IObserver<IAllocatedEvaluator>> _allocatedEvaluatorHandlers; - - private ISet<IObserver<IActiveContext>> _activeContextHandlers; - - private ISet<IObserver<IActiveContext>> _driverRestartActiveContextHandlers; - - private ISet<IObserver<ITaskMessage>> _taskMessageHandlers; - - private ISet<IObserver<IFailedTask>> _failedTaskHandlers; - - private ISet<IObserver<ISuspendedTask>> _suspendedTaskHandlers; - - private ISet<IObserver<IRunningTask>> _runningTaskHandlers; - - private ISet<IObserver<IRunningTask>> _driverRestartRunningTaskHandlers; - - private ISet<IObserver<IFailedEvaluator>> _failedEvaluatorHandlers; - - private ISet<IObserver<ICompletedEvaluator>> _completedEvaluatorHandlers; - - private ISet<IObserver<IClosedContext>> _closedContextHandlers; - - private ISet<IObserver<IFailedContext>> _failedContextHandlers; - - private ISet<IObserver<IContextMessage>> _contextMessageHandlers; - - private ISet<IObserver<ICompletedTask>> _completedTaskHandlers; - - private HttpServerHandler _httpServerHandler; - - [Inject] - public DriverBridge( - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartHandler))] IObserver<StartTime> driverRestartHandler, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.EvaluatorRequestHandlers))] ISet<IObserver<IEvaluatorRequestor>> evaluatorRequestHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers))] ISet<IObserver<IAllocatedEvaluator>> allocatedEvaluatorHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ActiveContextHandlers))] ISet<IObserver<IActiveContext>> activeContextHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TaskMessageHandlers))] ISet<IObserver<ITaskMessage>> taskMessageHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedTaskHandlers))] ISet<IObserver<IFailedTask>> failedTaskHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedEvaluatorHandlers))] ISet<IObserver<IFailedEvaluator>> failedEvaluatorHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers))] ISet<IObserver<ICompletedEvaluator>> completedEvaluatorHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.RunningTaskHandlers))] ISet<IObserver<IRunningTask>> runningTaskHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedTaskHandlers))] ISet<IObserver<ICompletedTask>> completedTaskHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.SuspendedTaskHandlers))] ISet<IObserver<ISuspendedTask>> suspendedTaskHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ClosedContextHandlers))] ISet<IObserver<IClosedContext>> closedContextHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedContextHandlers))] ISet<IObserver<IFailedContext>> failedContextHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ContextMessageHandlers))] ISet<IObserver<IContextMessage>> contextMessageHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))] ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))] ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> traceListeners, - [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel, - HttpServerHandler httpServerHandler) - { - foreach (TraceListener listener in traceListeners) - { - Logger.AddTraceListner(listener); - } - _logger = Logger.GetLogger(typeof(DriverBridge)); - _logger.Log(Level.Info, "Constructing DriverBridge"); - - Level level; - if (!Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) - { - _logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Invalid trace level {0} provided, will by default use verbose level", traceLevel)); - } - else - { - Logger.SetCustomLevel(level); - } - - _evaluatorRequestHandlers = evaluatorRequestHandlers; - _allocatedEvaluatorHandlers = allocatedEvaluatorHandlers; - _activeContextHandlers = activeContextHandlers; - _taskMessageHandlers = taskMessageHandlers; - _failedEvaluatorHandlers = failedEvaluatorHandlers; - _failedTaskHandlers = failedTaskHandlers; - _completedTaskHandlers = completedTaskHandlers; - _runningTaskHandlers = runningTaskHandlers; - _suspendedTaskHandlers = suspendedTaskHandlers; - _completedEvaluatorHandlers = completedEvaluatorHandlers; - _closedContextHandlers = closedContextHandlers; - _failedContextHandlers = failedContextHandlers; - _contextMessageHandlers = contextMessageHandlers; - _driverRestartHandler = driverRestartHandler; - _driverRestartActiveContextHandlers = driverRestartActiveContextHandlers; - _driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers; - _httpServerHandler = httpServerHandler; - - _evaluatorRequestorSubscriber = new ClrSystemHandler<IEvaluatorRequestor>(); - _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>(); - _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>(); - _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>(); - _activeContextSubscriber = new ClrSystemHandler<IActiveContext>(); - _failedTaskSubscriber = new ClrSystemHandler<IFailedTask>(); - _failedEvaluatorSubscriber = new ClrSystemHandler<IFailedEvaluator>(); - _httpServerEventSubscriber = new ClrSystemHandler<IHttpMessage>(); - _completedTaskSubscriber = new ClrSystemHandler<ICompletedTask>(); - _runningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); - _suspendedTaskSubscriber = new ClrSystemHandler<ISuspendedTask>(); - _closedContextSubscriber = new ClrSystemHandler<IClosedContext>(); - _failedContextSubscriber = new ClrSystemHandler<IFailedContext>(); - _contextMessageSubscriber = new ClrSystemHandler<IContextMessage>(); - _driverRestartSubscriber = new ClrSystemHandler<StartTime>(); - _driverRestartActiveContextSubscriber = new ClrSystemHandler<IActiveContext>(); - _driverRestartRunningTaskSubscriber = new ClrSystemHandler<IRunningTask>(); - } - - public ulong[] Subscribe() - { - ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray(); - - // subscribe to StartTime event for driver restart - _driverRestartSubscriber.Subscribe(_driverRestartHandler); - _logger.Log(Level.Info, "subscribed to Driver restart handler: " + _driverRestartHandler); - handlers[Constants.Handlers[Constants.DriverRestartHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartSubscriber); - - // subscribe to Evaluator Requestor - foreach (var handler in _evaluatorRequestHandlers) - { - _evaluatorRequestorSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IEvaluatorRequestor handler: " + handler); - } - handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorSubscriber); - - // subscribe to Allocated Evaluator - foreach (var handler in _allocatedEvaluatorHandlers) - { - _allocatedEvaluatorSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IAllocatedEvaluator handler: " + handler); - } - handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber); - - // subscribe to TaskMessage - foreach (var handler in _taskMessageHandlers) - { - _taskMessageSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to ITaskMessage handler: " + handler); - } - handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber); - - // subscribe to Active Context - foreach (var handler in _activeContextHandlers) - { - _activeContextSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IActiveContext handler: " + handler); - } - handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber); - - // subscribe to Failed Task - foreach (var handler in _failedTaskHandlers) - { - _failedTaskSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IFailedTask handler: " + handler); - } - handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber); - - // subscribe to Running Task - foreach (var handler in _runningTaskHandlers) - { - _runningTaskSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IRunningask handler: " + handler); - } - handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber); - - // subscribe to Completed Task - foreach (var handler in _completedTaskHandlers) - { - _completedTaskSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to ICompletedTask handler: " + handler); - } - handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber); - - // subscribe to Suspended Task - foreach (var handler in _suspendedTaskHandlers) - { - _suspendedTaskSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to ISuspendedTask handler: " + handler); - } - handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber); - - // subscribe to Failed Evaluator - foreach (var handler in _failedEvaluatorHandlers) - { - _failedEvaluatorSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IFailedEvaluator handler: " + handler); - } - handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber); - - // subscribe to Completed Evaluator - foreach (var handler in _completedEvaluatorHandlers) - { - _completedEvaluatorSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to ICompletedEvaluator handler: " + handler); - } - handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber); - - // subscribe to Closed Context - foreach (var handler in _closedContextHandlers) - { - _closedContextSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IClosedContext handler: " + handler); - } - handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber); - - // subscribe to Failed Context - foreach (var handler in _failedContextHandlers) - { - _failedContextSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IFailedContext handler: " + handler); - } - handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber); - - // subscribe to Context Message - foreach (var handler in _contextMessageHandlers) - { - _contextMessageSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to IContextMesage handler: " + handler); - } - handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber); - - // subscribe to Active Context received during driver restart - foreach (var handler in _driverRestartActiveContextHandlers) - { - _driverRestartActiveContextSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to handler for IActiveContext received during driver restart: " + handler); - } - handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber); - - // subscribe to Running Task received during driver restart - foreach (var handler in _driverRestartRunningTaskHandlers) - { - _driverRestartRunningTaskSubscriber.Subscribe(handler); - _logger.Log(Level.Info, "subscribed to handler for IRunningTask received during driver restart: " + handler); - } - handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber); - - // subscribe to Http message - _httpServerEventSubscriber.Subscribe(_httpServerHandler); - _logger.Log(Level.Info, "subscribed to IHttpMessage handler :" + _httpServerHandler); - handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); - - return handlers; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs deleted file mode 100644 index 047ba5d..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfiguration.cs +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.Evaluator; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Time; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Globalization; - -[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] - -namespace Org.Apache.REEF.Driver.Bridge -{ - public class DriverBridgeConfiguration : ConfigurationModuleBuilder - { - /// <summary> - /// The event handler invoked right after the driver boots up. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly RequiredImpl<IStartHandler> OnDriverStarted = new RequiredImpl<IStartHandler>(); - - /// <summary> - /// The event handler invoked when driver restarts - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<StartTime>> OnDriverRestarted = new OptionalImpl<IObserver<StartTime>>(); - - /// <summary> - /// The event handler for requesting evaluator - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IEvaluatorRequestor>> OnEvaluatorRequested = new OptionalImpl<IObserver<IEvaluatorRequestor>>(); - - /// <summary> - /// Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IAllocatedEvaluator>> OnEvaluatorAllocated = new OptionalImpl<IObserver<IAllocatedEvaluator>>(); - - /// <summary> - /// Event handler for completed evaluators. Defaults to logging if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<ICompletedEvaluator>> OnEvaluatorCompleted = new OptionalImpl<IObserver<ICompletedEvaluator>>(); - - /// <summary> - /// Event handler for failed evaluators. Defaults to job failure if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IFailedEvaluator>> OnEvaluatorFailed = new OptionalImpl<IObserver<IFailedEvaluator>>(); - - /// <summary> - /// Event handler for failed evaluators. Defaults to job failure if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IHttpHandler> OnHttpEvent = new OptionalImpl<IHttpHandler>(); - - /// <summary> - /// Event handler for task messages. Defaults to logging if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<ITaskMessage>> OnTaskMessage = new OptionalImpl<IObserver<ITaskMessage>>(); - - /// <summary> - /// Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<ICompletedTask>> OnTaskCompleted = new OptionalImpl<IObserver<ICompletedTask>>(); - - /// <summary> - /// Event handler for failed tasks. Defaults to job failure if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IFailedTask>> OnTaskFailed = new OptionalImpl<IObserver<IFailedTask>>(); - - ///// <summary> - ///// Event handler for running tasks. Defaults to logging if not bound. - ///// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IRunningTask>> OnTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); - - ///// <summary> - ///// Event handler for running task received during driver restart. Defaults to logging if not bound. - ///// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IRunningTask>> OnDriverRestartTaskRunning = new OptionalImpl<IObserver<IRunningTask>>(); - - /// <summary> - /// Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support - /// task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<ISuspendedTask>> OnTaskSuspended = new OptionalImpl<IObserver<ISuspendedTask>>(); - - /// <summary> - /// Event handler for active context. Defaults to closing the context if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IActiveContext>> OnContextActive = new OptionalImpl<IObserver<IActiveContext>>(); - - /// <summary> - /// Event handler for active context received during driver restart. Defaults to closing the context if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IActiveContext>> OnDirverRestartContextActive = new OptionalImpl<IObserver<IActiveContext>>(); - - /// <summary> - /// Event handler for closed context. Defaults to logging if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IClosedContext>> OnContextClosed = new OptionalImpl<IObserver<IClosedContext>>(); - - /// <summary> - /// Event handler for closed context. Defaults to job failure if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IFailedContext>> OnContextFailed = new OptionalImpl<IObserver<IFailedContext>>(); - - /// <summary> - /// Event handler for context messages. Defaults to logging if not bound. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IObserver<IContextMessage>> OnContextMessage = new OptionalImpl<IObserver<IContextMessage>>(); - - /// <summary> - /// Additional set of string arguments that can be pssed to handlers through client - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalParameter<string> CommandLineArguments = new OptionalParameter<string>(); - - /// <summary> - /// The trace level of the TraceListner - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalParameter<string> CustomTraceLevel = new OptionalParameter<string>(); - - /// <summary> - /// Additional set of trace listners provided by client - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalParameter<TraceListener> CustomTraceListeners = new OptionalParameter<TraceListener>(); - - /// <summary> - /// The implemenation for (attempting to) re-establish connection to driver - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalImpl<IDriverConnection> OnDriverReconnect = new OptionalImpl<IDriverConnection>(); - - // This is currently not needed in Bridge/Driver model - ///// <summary> - ///// The event handler invoked right before the driver shuts down. Defaults to ignore. - ///// </summary> - //public static readonly OptionalImpl<IObserver<StopTime>> OnDriverStop = new OptionalImpl<IObserver<StopTime>>(); - - // Client handlers only needed when client interactions are expeceted. Not enabled for now. - ///// <summary> - ///// Event handler for client messages. Defaults to logging if not bound. - ///// </summary> - //public static readonly OptionalImpl<IObserver<byte[]>> OnClientMessage = new OptionalImpl<IObserver<byte[]>>(); - - // Client handlers only needed when client interactions are expeceted. Not enabled for now. - ///// <summary> - ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. - ///// Note: in java the type is void, but IObserver does not take void as a type - ///// </summary> - //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosed = new OptionalImpl<IObserver<byte[]>>(); - - // Client handlers only needed when client interactions are expeceted. Not enabled for now. - ///// <summary> - ///// Event handler for close messages sent by the client. Defaults to job failure if not bound. - ///// </summary> - //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosedMessage = new OptionalImpl<IObserver<byte[]>>(); - - public static ConfigurationModule ConfigurationModule - { - get - { - return new DriverBridgeConfiguration() - .BindImplementation(GenericType<IStartHandler>.Class, OnDriverStarted) - .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartHandler>.Class, OnDriverRestarted) - .BindImplementation(GenericType<IDriverConnection>.Class, OnDriverReconnect) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.EvaluatorRequestHandlers>.Class, OnEvaluatorRequested) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers>.Class, OnEvaluatorAllocated) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ActiveContextHandlers>.Class, OnContextActive) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TaskMessageHandlers>.Class, OnTaskMessage) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedTaskHandlers>.Class, OnTaskFailed) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.RunningTaskHandlers>.Class, OnTaskRunning) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.SuspendedTaskHandlers>.Class, OnTaskSuspended) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedEvaluatorHandlers>.Class, OnEvaluatorFailed) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers>.Class, OnEvaluatorCompleted) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedTaskHandlers>.Class, OnTaskCompleted) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ClosedContextHandlers>.Class, OnContextClosed) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedContextHandlers>.Class, OnContextFailed) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class, OnContextMessage) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ArgumentSets>.Class, CommandLineArguments) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.HttpEventHandlers>.Class, OnHttpEvent) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TraceListenersSet>.Class, CustomTraceListeners) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class, OnDirverRestartContextActive) - .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class, OnDriverRestartTaskRunning) - .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class, CustomTraceLevel) - .Build(); - } - } - } - - public class CommandLineArguments - { - [Inject] - public CommandLineArguments([Parameter(typeof(DriverBridgeConfigurationOptions.ArgumentSets))] ISet<string> arguments) - { - Arguments = arguments; - } - - public ISet<string> Arguments { get; set; } - } - - public class CustomTraceListeners - { - [Inject] - public CustomTraceListeners([Parameter(typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> listeners) - { - Listeners = listeners; - } - - public ISet<TraceListener> Listeners { get; set; } - } - - public class CustomTraceLevel - { - [Inject] - public CustomTraceLevel([Parameter(typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel) - { - Level level = Level.Verbose; - if (Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level)) - { - Logger.SetCustomLevel(level); - } - else - { - Console.WriteLine("Cannot parse trace level" + traceLevel); - } - TraceLevel = level; - } - - public Level TraceLevel { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs deleted file mode 100644 index c2f4bfd..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/DriverBridgeConfigurationOptions.cs +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Defaults; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Time; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; - -[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")] - -namespace Org.Apache.REEF.Driver.Bridge -{ - /// <summary> - /// Hosts all named parameters for Drivers, including bridge handlers. - /// </summary> - public class DriverBridgeConfigurationOptions - { - // Level.Verbose (since enum is not suppoted for TANG, we use a string here) - private const string _verboseLevel = "Verbose"; - - [NamedParameter(documentation: "Called when driver is restarted, after CLR bridge is set up.", defaultClasses: new[] { typeof(DefaultDriverRestartHandler) })] - public class DriverRestartHandler : Name<IObserver<StartTime>> - { - } - - [NamedParameter(documentation: "Called when evaluator is requested.", defaultClasses: new[] { typeof(DefaultEvaluatorRequestorHandler) })] - public class EvaluatorRequestHandlers : Name<ISet<IObserver<IEvaluatorRequestor>>> - { - } - - [NamedParameter(documentation: "Called when an exception occurs on a running evaluator.", defaultClasses: new[] { typeof(DefaultEvaluatorFailureHandler) })] - public class FailedEvaluatorHandlers : Name<ISet<IObserver<IFailedEvaluator>>> - { - } - - [NamedParameter(documentation: "Called when an evaluator completes.", defaultClasses: new[] { typeof(DefaultEvaluatorCompletionHandler) })] - public class CompletedEvaluatorHandlers : Name<ISet<IObserver<ICompletedEvaluator>>> - { - } - - [NamedParameter(documentation: "Called when an allocated evaluator is given to the client.", defaultClasses: new[] { typeof(DefaultEvaluatorAllocationHandler) })] - public class AllocatedEvaluatorHandlers : Name<ISet<IObserver<IAllocatedEvaluator>>> - { - } - - [NamedParameter(documentation: "Running task handler.", defaultClasses: new[] { typeof(DefaultTaskRunningHandler) })] - public class RunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> - { - } - - [NamedParameter(documentation: "Running task during driver restart handler.", defaultClasses: new[] { typeof(DefaultDriverRestartTaskRunningHandler) })] - public class DriverRestartRunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>> - { - } - - [NamedParameter(documentation: "Task exception handler.", defaultClasses: new[] { typeof(DefaultTaskFailureHandler) })] - public class FailedTaskHandlers : Name<ISet<IObserver<IFailedTask>>> - { - } - - [NamedParameter(documentation: "Task message handler.", defaultClasses: new[] { typeof(DefaultTaskMessageHandler) })] - public class TaskMessageHandlers : Name<ISet<IObserver<ITaskMessage>>> - { - } - - [NamedParameter(documentation: "Http Event Handlers.", defaultClasses: new[] { typeof(DefaultHttpHandler) })] - public class HttpEventHandlers : Name<ISet<IHttpHandler>> - { - } - - [NamedParameter(documentation: "Completed task handler.", defaultClasses: new[] { typeof(DefaultTaskCompletionHandler) })] - public class CompletedTaskHandlers : Name<ISet<IObserver<ICompletedTask>>> - { - } - - [NamedParameter(documentation: "Suspended task handler.", defaultClasses: new[] { typeof(DefaultTaskSuspensionHandler) })] - public class SuspendedTaskHandlers : Name<ISet<IObserver<ISuspendedTask>>> - { - } - - [NamedParameter(documentation: "Handler for IActiveContext.", defaultClasses: new[] { typeof(DefaultContextActiveHandler) })] - public class ActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> - { - } - - [NamedParameter(documentation: "Handler for IActiveContext received during driver restart.", defaultClasses: new[] { typeof(DefaultDriverRestartContextActiveHandler) })] - public class DriverRestartActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>> - { - } - - [NamedParameter(documentation: "Handler for ClosedContext.", defaultClasses: new[] { typeof(DefaultContextClosureHandler) })] - public class ClosedContextHandlers : Name<ISet<IObserver<IClosedContext>>> - { - } - - [NamedParameter(documentation: "Handler for FailedContext.", defaultClasses: new[] { typeof(DefaultContextFailureHandler) })] - public class FailedContextHandlers : Name<ISet<IObserver<IFailedContext>>> - { - } - - [NamedParameter(documentation: "Handler for ContextMessage.", defaultClasses: new[] { typeof(DefaultContextMessageHandler) })] - public class ContextMessageHandlers : Name<ISet<IObserver<IContextMessage>>> - { - } - - [NamedParameter("Command Line Arguments supplied by client", "CommandLineArguments", null)] - public class ArgumentSets : Name<ISet<string>> - { - } - - [NamedParameter("Additional trace listners supplied by client", "TraceListeners", null, defaultClasses: new[] { typeof(DefaultCustomTraceListener) })] - public class TraceListenersSet : Name<ISet<TraceListener>> - { - } - - [NamedParameter("Custom Trace Level", "TraceLevel", defaultValue: _verboseLevel)] - public class TraceLevel : Name<string> - { - } - - //[NamedParameter(documentation: "Job message handler.", defaultClasses: new[] { typeof(DefaultClientMessageHandler) })] - //public class ClientMessageHandlers : Name<ISet<IObserver<byte[]>>> - //{ - //} - - //[NamedParameter(documentation: "Client close handler.", defaultClasses: new[] { typeof(DefaultClientCloseHandler) })] - //public class ClientCloseHandlers : Name<ISet<IObserver<byte[]>>> - //{ - //} - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs deleted file mode 100644 index 7aa1a33..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpMessage.cs +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Runtime.Serialization; - -namespace Org.Apache.REEF.Driver.Bridge -{ - [DataContract] - public class HttpMessage : IHttpMessage - { - public HttpMessage(IHttpServerBridgeClr2Java httpServerBridgeClr2Java) - { - HttpServerBridgeClr2Java = httpServerBridgeClr2Java; - } - - [DataMember] - private IHttpServerBridgeClr2Java HttpServerBridgeClr2Java { get; set; } - - public string GetRequestString() - { - return HttpServerBridgeClr2Java.GetQueryString(); - } - - public void SetQueryResult(string responseString) - { - HttpServerBridgeClr2Java.SetQueryResult(responseString); - } - - public byte[] GetQueryReuestData() - { - return HttpServerBridgeClr2Java.GetQueryRequestData(); - } - - public void SetQueryResponseData(byte[] responseData) - { - HttpServerBridgeClr2Java.SetQueryResponseData(responseData); - } - - public void SetUriSpecification(string uriSpecification) - { - HttpServerBridgeClr2Java.SetUriSpecification(uriSpecification); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs deleted file mode 100644 index c6691e7..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerHandler.cs +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Avro; -using Org.Apache.REEF.Driver.bridge; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using System.Net; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Driver.Bridge -{ - /// <summary> - /// HttpServerHandler, the handler for all CLR http events - /// </summary> - public class HttpServerHandler : IObserver<IHttpMessage> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HttpServerHandler)); - - private static readonly string SPEC = "SPEC"; - - private IDictionary<string, IHttpHandler> eventHandlers = new Dictionary<string, IHttpHandler>(); - - private HttpServerPort httpServerPort; - - /// <summary> - /// Initializes a new instance of the <see cref="HttpServerHandler" /> class. - /// </summary> - /// <param name="httpEventHandlers">The HTTP event handlers.</param> - /// <param name="httpServerPort">The HTTP server port.</param> - [Inject] - public HttpServerHandler([Parameter(Value = typeof(DriverBridgeConfigurationOptions.HttpEventHandlers))] ISet<IHttpHandler> httpEventHandlers, - HttpServerPort httpServerPort) - { - LOGGER.Log(Level.Info, "Constructing HttpServerHandler"); - foreach (var h in httpEventHandlers) - { - string spec = h.GetSpecification(); - if (spec.Contains(":")) - { - Exceptions.Throw(new ArgumentException("spec cannot contain :"), "The http spec given is " + spec, LOGGER); - } - LOGGER.Log(Level.Info, "HttpHandler spec:" + spec); - eventHandlers.Add(spec.ToLower(CultureInfo.CurrentCulture), h); - } - this.httpServerPort = httpServerPort; - } - - /// <summary> - /// Called when receving an http request from Java side - /// </summary> - /// <param name="httpMessage">The HTTP message.</param> - public void OnNext(IHttpMessage httpMessage) - { - LOGGER.Log(Level.Info, "HttpHandler OnNext is called"); - string requestString = httpMessage.GetRequestString(); - - if (requestString != null && requestString.Equals(SPEC)) - { - LOGGER.Log(Level.Info, "HttpHandler OnNext, requestString:" + requestString); - LOGGER.Log(Level.Info, "HttpHandler OnNext, port number:" + httpServerPort.PortNumber); - - httpMessage.SetUriSpecification(GetAllSpecifications()); - } - else - { - LOGGER.Log(Level.Info, "HttpHandler OnNext, handling http request."); - byte[] byteData = httpMessage.GetQueryReuestData(); - AvroHttpRequest avroHttpRequest = AvroHttpSerializer.FromBytes(byteData); - LOGGER.Log(Level.Info, "HttpHandler OnNext, requestData:" + avroHttpRequest); - - string spec = GetSpecification(avroHttpRequest.PathInfo); - if (spec != null) - { - LOGGER.Log(Level.Info, "HttpHandler OnNext, target:" + spec); - ReefHttpRequest request = ToHttpRequest(avroHttpRequest); - ReefHttpResponse response = new ReefHttpResponse(); - - IHttpHandler handler; - eventHandlers.TryGetValue(spec.ToLower(CultureInfo.CurrentCulture), out handler); - - byte[] responseData; - if (handler != null) - { - LOGGER.Log(Level.Info, "HttpHandler OnNext, get eventHandler:" + handler.GetSpecification()); - handler.OnHttpRequest(request, response); - responseData = response.OutputStream; - } - else - { - responseData = - ByteUtilities.StringToByteArrays(string.Format(CultureInfo.CurrentCulture, - "No event handler found at CLR side for {0}.", - spec)); - } - httpMessage.SetQueryResponseData(responseData); - } - } - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - private string GetAllSpecifications() - { - return string.Join(":", eventHandlers.Keys.ToArray()); - } - - private string GetSpecification(string requestUri) - { - if (requestUri != null) - { - string[] parts = requestUri.Split('/'); - - if (parts.Length > 1) - { - return parts[1]; - } - } - return null; - } - - private ReefHttpRequest ToHttpRequest(AvroHttpRequest avroRequest) - { - ReefHttpRequest httpRequest = new ReefHttpRequest(); - httpRequest.PathInfo = avroRequest.PathInfo; - httpRequest.InputStream = avroRequest.InputStream; - httpRequest.Url = avroRequest.RequestUrl; - httpRequest.Querystring = avroRequest.QueryString; - - HttpMethod m; - HttpMethod.TryParse(avroRequest.HttpMethod, true, out m); - httpRequest.Method = m; - return httpRequest; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs deleted file mode 100644 index 50fb915..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/HttpServerPort.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Driver.bridge -{ - public class HttpServerPort - { - [Inject] - public HttpServerPort() - { - } - - public int PortNumber { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs deleted file mode 100644 index 409c974..0000000 --- a/lang/cs/Org.Apache.REEF.Driver/bridge/IHttpHandler.cs +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Driver.Bridge; - -namespace Org.Apache.REEF.Driver.Bridge -{ - public interface IHttpHandler - { - /// <summary> - /// Define the specification of the handler. ":" is not allowed in the specification. - /// </summary> - /// <returns>string specification</returns> - string GetSpecification(); - - /// <summary> - /// Called when Http request is sent - /// </summary> - /// <param name="requet">The requet.</param> - /// <param name="resonse">The resonse.</param> - void OnHttpRequest(ReefHttpRequest requet, ReefHttpResponse resonse); - } -}
