http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples.HelloCLRBridge/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloCLRBridge/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Examples.HelloCLRBridge/Properties/AssemblyInfo.cs deleted file mode 100644 index 60540a2..0000000 --- a/lang/cs/Org.Apache.REEF.Examples.HelloCLRBridge/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Reflection; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("Org.Apache.REEF.Examples.HelloCLRBridge")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Org.Apache.REEF.Examples.HelloCLRBridge")] -[assembly: AssemblyCopyright("Copyright © 2015")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from -// COM, set the ComVisible attribute to true on that type. -[assembly: ComVisible(false)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("2c286656-9b3f-42f9-a29f-3307ebfc8022")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/AnotherHelloAllocatedEvaluatorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/AnotherHelloAllocatedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/AnotherHelloAllocatedEvaluatorHandler.cs deleted file mode 100644 index 2145d4c..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/AnotherHelloAllocatedEvaluatorHandler.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class AnotherHelloAllocatedEvaluatorHandler : IObserver<IAllocatedEvaluator> - { - [Inject] - public AnotherHelloAllocatedEvaluatorHandler() - { - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - Console.WriteLine("I am just here for the ride."); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloActiveContextHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloActiveContextHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloActiveContextHandler.cs deleted file mode 100644 index 4bf8c47..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloActiveContextHandler.cs +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Examples.Tasks.HelloTask; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloActiveContextHandler : IObserver<IActiveContext> - { - [Inject] - public HelloActiveContextHandler() - { - } - - public void OnNext(IActiveContext activeContext) - { - Console.WriteLine( - string.Format( - CultureInfo.InvariantCulture, - "Active context {0} received from evaluator {1}", - activeContext.Id, - activeContext.EvaluatorId)); - - IEvaluatorDescriptor evaluatorDescriptor = activeContext.EvaluatorDescriptor; - string ipAddress = evaluatorDescriptor.NodeDescriptor.InetSocketAddress.Address.ToString(); - int port = evaluatorDescriptor.NodeDescriptor.InetSocketAddress.Port; - string hostName = evaluatorDescriptor.NodeDescriptor.HostName; - - Console.WriteLine( - string.Format( - CultureInfo.InvariantCulture, - "The running evaluator is assigned with {0} MB of memory and is running at ip: {1} and port {2}, with hostname {3}", - evaluatorDescriptor.Memory, - ipAddress, - port, - hostName)); - - IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "bridgeCLRHelloTask_" + DateTime.Now.Ticks) - .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) - .Set(TaskConfiguration.OnMessage, GenericType<HelloTask.HelloDriverMessageHandler>.Class) - .Set(TaskConfiguration.OnSendMessage, GenericType<HelloTaskMessage>.Class) - .Build(); - - activeContext.SubmitTask(taskConfiguration); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloAllocatedEvaluatorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloAllocatedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloAllocatedEvaluatorHandler.cs deleted file mode 100644 index 8b13426..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloAllocatedEvaluatorHandler.cs +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using System.Net; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Services; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Examples.Tasks.HelloTask; -using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloAllocatedEvaluatorHandler : IObserver<IAllocatedEvaluator> - { - [Inject] - public HelloAllocatedEvaluatorHandler() - { - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - string control = string.Empty; - - ISet<string> arguments = ClrHandlerHelper.GetCommandLineArguments(); - - if (arguments != null && arguments.Any()) - { - foreach (string argument in arguments) - { - Console.WriteLine("testing argument: " + argument); - } - - control = arguments.Last(); - } - - IEvaluatorDescriptor descriptor = allocatedEvaluator.GetEvaluatorDescriptor(); - - IConfiguration serviceConfiguration = ServiceConfiguration.ConfigurationModule - .Set(ServiceConfiguration.Services, GenericType<HelloService>.Class) - .Build(); - - IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule - .Set(ContextConfiguration.Identifier, "bridgeHelloCLRContextId_" + Guid.NewGuid().ToString("N")) - .Build(); - - IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "bridgeHelloCLRTaskId_" + Guid.NewGuid().ToString("N")) - .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) - .Set(TaskConfiguration.OnMessage, GenericType<HelloTask.HelloDriverMessageHandler>.Class) - .Set(TaskConfiguration.OnSendMessage, GenericType<HelloTaskMessage>.Class) - .Build(); - - IConfiguration mergedTaskConfiguration = taskConfiguration; - - if (allocatedEvaluator.NameServerInfo != null) - { - IPEndPoint nameServerEndpoint = NetUtilities.ParseIpEndpoint(allocatedEvaluator.NameServerInfo); - - IConfiguration nameClientConfiguration = TangFactory.GetTang().NewConfigurationBuilder( - NamingConfiguration.ConfigurationModule - .Set(NamingConfiguration.NameServerAddress, nameServerEndpoint.Address.ToString()) - .Set(NamingConfiguration.NameServerPort, - nameServerEndpoint.Port.ToString(CultureInfo.InvariantCulture)) - .Build()) - .BindImplementation(GenericType<INameClient>.Class, - GenericType<NameClient>.Class) - .Build(); - - mergedTaskConfiguration = Configurations.Merge(taskConfiguration, nameClientConfiguration); - } - - string ipAddress = descriptor.NodeDescriptor.InetSocketAddress.Address.ToString(); - int port = descriptor.NodeDescriptor.InetSocketAddress.Port; - string hostName = descriptor.NodeDescriptor.HostName; - Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "Alloated evaluator {0} with ip {1}:{2}. Hostname is {3}", allocatedEvaluator.Id, ipAddress, port, hostName)); - Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "Evaluator is assigned with {0} MB of memory and {1} cores.", descriptor.Memory, descriptor.VirtualCore)); - - if (control.Equals("submitContext", StringComparison.OrdinalIgnoreCase)) - { - allocatedEvaluator.SubmitContext(contextConfiguration); - } - else if (control.Equals("submitContextAndServiceAndTask", StringComparison.OrdinalIgnoreCase)) - { - allocatedEvaluator.SubmitContextAndServiceAndTask(contextConfiguration, serviceConfiguration, mergedTaskConfiguration); - } - else - { - // default behavior - allocatedEvaluator.SubmitContextAndTask(contextConfiguration, mergedTaskConfiguration); - } - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloCompletedEvaluatorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloCompletedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloCompletedEvaluatorHandler.cs deleted file mode 100644 index c256a89..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloCompletedEvaluatorHandler.cs +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - /// <summary> - /// Sample implementaion of RunningTaskHandler - /// </summary> - public class HelloCompletedEvaluatorHandler : IObserver<ICompletedEvaluator> - { - [Inject] - public HelloCompletedEvaluatorHandler() - { - } - - public void OnNext(ICompletedEvaluator completedEvaluator) - { - string messageStr = string.Format( - CultureInfo.InvariantCulture, - "HelloCompletedEvaluatorHandler: Evaluator [{0}] is done.", - completedEvaluator.Id); - Console.WriteLine(messageStr); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartActiveContextHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartActiveContextHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartActiveContextHandler.cs deleted file mode 100644 index 17f4074..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartActiveContextHandler.cs +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloDriverRestartActiveContextHandler : IObserver<IActiveContext> - { - [Inject] - public HelloDriverRestartActiveContextHandler() - { - } - - public void OnNext(IActiveContext activeContext) - { - Console.WriteLine( - string.Format( - CultureInfo.InvariantCulture, - "Active context {0} received after driver restart, from evaluator {1}", - activeContext.Id, - activeContext.EvaluatorId)); - - IEvaluatorDescriptor evaluatorDescriptor = activeContext.EvaluatorDescriptor; - string ipAddress = evaluatorDescriptor.NodeDescriptor.InetSocketAddress.Address.ToString(); - int port = evaluatorDescriptor.NodeDescriptor.InetSocketAddress.Port; - string hostName = evaluatorDescriptor.NodeDescriptor.HostName; - - Console.WriteLine( - string.Format( - CultureInfo.InvariantCulture, - "The running evaluator allocated by previous driver is assigned with {0} MB of memory and is running at ip: {1} and port {2}, with hostname {3}", - evaluatorDescriptor.Memory, - ipAddress, - port, - hostName)); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartRunningTaskHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartRunningTaskHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartRunningTaskHandler.cs deleted file mode 100644 index 5250605..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverRestartRunningTaskHandler.cs +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - /// <summary> - /// Sample implementaion of RunningTaskHandler - /// </summary> - public class HelloDriverRestartRunningTaskHandler : IObserver<IRunningTask> - { - [Inject] - public HelloDriverRestartRunningTaskHandler() - { - } - - public void OnNext(IRunningTask runningTask) - { - IActiveContext context = runningTask.ActiveContext; - - Console.WriteLine(string.Format( - CultureInfo.InvariantCulture, - "HelloDriverRestartRunningTaskHandler: Task [{0}] is running after driver restart. Evaluator id: [{1}].", - runningTask.Id, - context.EvaluatorId)); - - runningTask.Send(ByteUtilities.StringToByteArrays( - string.Format( - CultureInfo.InvariantCulture, - "Hello, task {0}! Glad to know that you are still running in Evaluator {1} after driver restart!", - runningTask.Id, - context.EvaluatorId))); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverStartHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverStartHandler.cs deleted file mode 100644 index f1eef50..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloDriverStartHandler.cs +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public sealed class HelloDriverStartHandler : IObserver<IDriverStarted> - { - private static readonly Logger _Logger = Logger.GetLogger(typeof(HelloDriverStartHandler)); - private readonly IEvaluatorRequestor _evaluatorRequestor; - - [Inject] - private HelloDriverStartHandler(IEvaluatorRequestor evaluatorRequestor) - { - _evaluatorRequestor = evaluatorRequestor; - } - - public void OnError(Exception error) - { - throw error; - } - - public void OnCompleted() - { - } - - /// <summary> - /// Called to start the user mode driver - /// </summary> - /// <param name="driverStarted"></param> - public void OnNext(IDriverStarted driverStarted) - { - _Logger.Log(Level.Info, string.Format("HelloDriver started at {0}", driverStarted.StartTime)); - - int evaluatorsNumber = 1; - int memory = 512; - int core = 2; - string rack = "WonderlandRack"; - string evaluatorBatchId = "evaluatorThatRequires512MBofMemory"; - EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, core, rack, evaluatorBatchId); - - _evaluatorRequestor.Submit(request); - - evaluatorsNumber = 1; - memory = 1999; - core = 2; - rack = "WonderlandRack"; - evaluatorBatchId = "evaluatorThatRequires1999MBofMemory"; - request = new EvaluatorRequest(evaluatorsNumber, memory, core, rack, evaluatorBatchId); - _evaluatorRequestor.Submit(request); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloEvaluatorRequestorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloEvaluatorRequestorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloEvaluatorRequestorHandler.cs deleted file mode 100644 index c74c81a..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloEvaluatorRequestorHandler.cs +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloEvaluatorRequestorHandler : IObserver<IEvaluatorRequestor> - { - [Inject] - public HelloEvaluatorRequestorHandler() - { - } - - public void OnNext(IEvaluatorRequestor evalutorRequestor) - { - int evaluatorsNumber = 1; - int memory = 512; - int core = 2; - string rack = "WonderlandRack"; - string evaluatorBatchId = "evaluatorThatRequires512MBofMemory"; - EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, core, rack, evaluatorBatchId); - - evalutorRequestor.Submit(request); - - evaluatorsNumber = 1; - memory = 1999; - core = 2; - rack = "WonderlandRack"; - evaluatorBatchId = "evaluatorThatRequires1999MBofMemory"; - request = new EvaluatorRequest(evaluatorsNumber, memory, core, rack, evaluatorBatchId); - evalutorRequestor.Submit(request); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedEvaluatorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedEvaluatorHandler.cs deleted file mode 100644 index 66290ef..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedEvaluatorHandler.cs +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloFailedEvaluatorHandler : IObserver<IFailedEvaluator> - { - private static int _failureCount = 0; - - private static readonly int _maxTrial = 2; - - [Inject] - public HelloFailedEvaluatorHandler() - { - } - - public void OnNext(IFailedEvaluator failedEvaluator) - { - Console.WriteLine("Receive a failed evaluator: " + failedEvaluator.Id); - if (++_failureCount < _maxTrial) - { - Console.WriteLine("Requesting another evaluator"); - EvaluatorRequest newRequest = new EvaluatorRequest(1, 512, "somerack"); - IEvaluatorRequestor requestor = failedEvaluator.GetEvaluatorRequetor(); - if (failedEvaluator.GetEvaluatorRequetor() != null) - { - requestor.Submit(newRequest); - } - } - else - { - Console.WriteLine("Exceed max retries number"); - throw new Exception("Unrecoverable evaluator failure."); - } - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedTaskHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedTaskHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedTaskHandler.cs deleted file mode 100644 index 49b10c1..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloFailedTaskHandler.cs +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloFailedTaskHandler : IObserver<IFailedTask> - { - [Inject] - public HelloFailedTaskHandler() - { - } - - public void OnNext(IFailedTask failedTask) - { - string errorMessage = string.Format( - CultureInfo.InvariantCulture, - "Task [{0}] has failed caused by [{1}], with message [{2}] and description [{3}]. The raw data for failure is [{4}].", - failedTask.Id, - failedTask.Reason.IsPresent() ? failedTask.Reason.Value : string.Empty, - failedTask.Message, - failedTask.Description.IsPresent() ? failedTask.Description.Value : string.Empty, - failedTask.Data.IsPresent() ? ByteUtilities.ByteArrarysToString(failedTask.Data.Value) : string.Empty); - - Console.WriteLine(errorMessage); - - if (failedTask.GetActiveContext().IsPresent()) - { - Console.WriteLine("Disposing the active context the failed task ran in."); - - // we must do something here: either close the context or resubmit a task to the active context - failedTask.GetActiveContext().Value.Dispose(); - } - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloHttpHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloHttpHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloHttpHandler.cs deleted file mode 100644 index ca1a810..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloHttpHandler.cs +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Globalization; -using System.Net; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloHttpHandler : IHttpHandler - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HttpServerHandler)); - - [Inject] - public HelloHttpHandler() - { - } - - public string GetSpecification() - { - return "NRT"; //Client Example - } - - public void OnHttpRequest(ReefHttpRequest requet, ReefHttpResponse response) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "HelloHttpHandler OnHttpRequest: URL: {0}, QueryString: {1}, inputStream: {2}.", requet.Url, requet.Querystring, ByteUtilities.ByteArrarysToString(requet.InputStream))); - response.Status = HttpStatusCode.OK; - response.OutputStream = - ByteUtilities.StringToByteArrays("Byte array returned from HelloHttpHandler in CLR!!!"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRestartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRestartHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRestartHandler.cs deleted file mode 100644 index 5ce86a2..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRestartHandler.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Time.Event; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloRestartHandler : IObserver<StartTime> - { - [Inject] - public HelloRestartHandler() - { - } - - public void OnNext(StartTime value) - { - Console.WriteLine("Hello from CLR: we are informed that Driver has restarted at " + new DateTime(value.TimeStamp)); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRunningTaskHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRunningTaskHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRunningTaskHandler.cs deleted file mode 100644 index 404164d..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloRunningTaskHandler.cs +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - /// <summary> - /// Sample implementaion of RunningTaskHandler - /// </summary> - public class HelloRunningTaskHandler : IObserver<IRunningTask> - { - [Inject] - public HelloRunningTaskHandler() - { - } - - public void OnNext(IRunningTask runningTask) - { - IActiveContext context = runningTask.ActiveContext; - - string messageStr = string.Format( - CultureInfo.InvariantCulture, - "HelloRunningTaskHandler: Task [{0}] is running. Evaluator id: [{1}].", - runningTask.Id, - context.EvaluatorId); - Console.WriteLine(messageStr); - - byte[] message = ByteUtilities.StringToByteArrays(messageStr); - - runningTask.Send(message); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloSimpleEventHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloSimpleEventHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloSimpleEventHandlers.cs deleted file mode 100644 index f54a1d9..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloSimpleEventHandlers.cs +++ /dev/null @@ -1,421 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Net; -using System.Text; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Examples.Tasks.HelloTask; -using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using IRunningTask = Org.Apache.REEF.Driver.Task.IRunningTask; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - enum DriverStatus - { - Init = 0, - Idle = 1, - RunningTasks = 2, - CompleteTasks = 3 - } - - internal enum TaskStatus - { - Submitting = 0, - Running = 1, - Completed = 2 - } - - /// <summary> - /// A demo class that contains basic handlers. It runs given tasks and is able to get request from http server and start to ren the tasks again. - /// It handle various http requests. It also monitoring task status and driver status. - /// </summary> - public class HelloSimpleEventHandlers : - IObserver<IEvaluatorRequestor>, - IObserver<IAllocatedEvaluator>, - IObserver<IActiveContext>, - IObserver<ICompletedTask>, - IObserver<IRunningTask>, - IObserver<IFailedTask>, - IObserver<IFailedEvaluator>, - IObserver<ICompletedEvaluator>, - IStartHandler, - IHttpHandler - { - private const int NumberOfTasks = 5; - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HelloSimpleEventHandlers)); - private IAllocatedEvaluator _allocatedEvaluator; - private IActiveContext _activeContext; - private readonly IList<IActiveContext> _activeContexts = new List<IActiveContext>(); - private DriverStatus driveStatus; - private TaskContext _taskContext; - - [Inject] - public HelloSimpleEventHandlers() - { - LOGGER.Log(Level.Info, "HelloSimpleEventHandlers constructor"); - CreateClassHierarchy(); - Identifier = "HelloSimpleEventHandlers"; - _taskContext = new TaskContext(); - _taskContext.TotalTasks = NumberOfTasks; - driveStatus = DriverStatus.Init; - } - - public string Identifier { get; set; } - - public static string ParsePathInfo(string pathInfo) - { - string[] p = pathInfo.Split('/'); - foreach (string s in p) - { - LOGGER.Log(Level.Info, s); - } - if (p.Length > 3) - { - return p[3]; - } - return null; - } - - public static void BuildHttpResponse( - ReefHttpResponse response, - HttpStatusCode httpStatusCode, - string strResponse) - { - response.Status = httpStatusCode; - response.OutputStream = ByteUtilities.StringToByteArrays(strResponse); - } - - public static void BuildHttpResponse( - ReefHttpResponse response, - HttpStatusCode httpStatusCode, - byte[] bytesResponse) - { - response.Status = httpStatusCode; - response.OutputStream = bytesResponse; - } - - public void OnNext(IEvaluatorRequestor evalutorRequestor) - { - using (LOGGER.LogFunction("HelloSimpleEventHandlers::evalutorRequestor received")) - { - int evaluatorsNumber = 2; - int memory = 1024 * 3; - int cpuCoreCount = 1; - string rack = "WonderlandRack"; - string evaluatorBatchId = "evaluatorThatRequires3GBofMemory"; - EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, cpuCoreCount, rack, evaluatorBatchId); - - evalutorRequestor.Submit(request); - } - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - string taskId = "Task_" + allocatedEvaluator.Id; - using (LOGGER.LogFunction("HelloSimpleEventHandlers::allocatedEvaluator received {0}.", taskId)) - { - _allocatedEvaluator = allocatedEvaluator; - - IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, "HelloSimpleEventHandlersContext_" + Guid.NewGuid().ToString("N")).Build(); - - allocatedEvaluator.SubmitContext(contextConfiguration); - } - } - - public void OnNext(IActiveContext activeContext) - { - using (LOGGER.LogFunction("HelloSimpleEventHandlers::activeContext received")) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received activeContext, EvaluatorId id: {0}", activeContext.EvaluatorId)); - _activeContext = activeContext; - _activeContexts.Add(activeContext); - driveStatus = DriverStatus.RunningTasks; - SubmitNextTask(activeContext); - } - } - - public void OnNext(ICompletedTask value) - { - using (LOGGER.LogFunction("HelloSimpleEventHandlers::CompletedTask received")) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received CompletedTask: {0}, task id: {1}", value.Id, _taskContext.CurrentTaskId())); - _activeContext = value.ActiveContext; - _taskContext.UpdateTaskStatus(value.Id, TaskStatus.Completed); - _taskContext.TaskCompleted++; - SubmitNextTask(value.ActiveContext); - } - } - - public void OnError(Exception error) - { - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Exception in coral handlers Msg: {1} Stack: {2}", error.Message, error.StackTrace)); - } - - public void OnCompleted() - { - } - - public void OnNext(IRunningTask value) - { - _taskContext.UpdateTaskStatus(_taskContext.CurrentTaskId(), TaskStatus.Running); - } - - public void OnNext(IFailedTask value) - { - } - - public void OnNext(IFailedEvaluator value) - { - } - - public void OnNext(ICompletedEvaluator completedEvaluator) - { - string messageStr = string.Format( - CultureInfo.InvariantCulture, - "HelloSimpleEventHandlers: Evaluator [{0}] is done.", - completedEvaluator.Id); - Console.WriteLine(messageStr); - } - - public string GetSpecification() - { - return "crystal"; - } - - public void OnHttpRequest(ReefHttpRequest request, ReefHttpResponse response) - { - string target = ParsePathInfo(request.PathInfo); - LOGGER.Log(Level.Info, "Target: " + target + ". PathInfo: " + request.PathInfo); - //if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("driverstatus")) - if (target != null && target.Equals("driverstatus")) - { - LOGGER.Log(Level.Info, "Target: " + target + ". Driver status: " + driveStatus.ToString()); - string msg = string.Format(CultureInfo.CurrentCulture, "Current Driver status: {0} ", driveStatus.ToString()); - BuildHttpResponse(response, HttpStatusCode.OK, msg); - return; - } - - if (target != null && target.Equals("taskstatus")) - { - LOGGER.Log(Level.Info, "Target: " + target + ". TaskStatus string: " + _taskContext.TaskStatusString()); - BuildHttpResponse(response, HttpStatusCode.OK, _taskContext.TaskStatusString()); - return; - } - - if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && driveStatus == DriverStatus.Init) - { - BuildHttpResponse(response, HttpStatusCode.OK, "Driver is not ready, wait a few second then send request again!!!"); - return; - } - - if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && driveStatus == DriverStatus.RunningTasks) - { - string msg = string.Format(CultureInfo.CurrentCulture, - "A job is running. Please check driver status and then submit your job again."); - BuildHttpResponse(response, HttpStatusCode.OK, msg); - return; - } - - if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && driveStatus == DriverStatus.Idle) - { - string numberOfTasks = getQueryValue(request.Querystring, "numberoftasks"); - if (numberOfTasks == null) - { - BuildHttpResponse(response, HttpStatusCode.OK, "Please specify number of tasks to run"); - return; - } - - driveStatus = DriverStatus.RunningTasks; - using (LOGGER.LogFunction("HelloSimpleEventHandlers::Processing a new Job from web request")) - { - _taskContext = new TaskContext(); - _taskContext.TotalTasks = int.Parse(numberOfTasks, CultureInfo.CurrentCulture); - BuildHttpResponse(response, HttpStatusCode.OK, "Job from web request is submitted and is running!!!"); - } - - foreach (var c in _activeContexts) - { - SubmitNextTask(c); - } - return; - } - BuildHttpResponse(response, HttpStatusCode.OK, "Unsupported query"); - } - - private static IDictionary<string, string> ParseQueryString(string queryString) - { - IDictionary<string, string> queryPairs = new Dictionary<string, string>(); - if (queryString != null && queryString.Length > 0) - { - string[] queries = queryString.Split('&'); - foreach (string query in queries) - { - string[] pairs = query.Split('='); - if (pairs.Length == 2 && !pairs[0].Equals(string.Empty) && !pairs[1].Equals(string.Empty)) - { - queryPairs[pairs[0]] = pairs[1]; - LOGGER.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "query key: {0}, Query value: {1}.", pairs[0], pairs[1])); - } - } - } - return queryPairs; - } - - private static string getQueryValue(string queryString, string name) - { - IDictionary<string, string> pairs = ParseQueryString(queryString); - string v; - pairs.TryGetValue(name, out v); - return v; - } - - private void CreateClassHierarchy() - { - HashSet<string> clrDlls = new HashSet<string>(); - clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(ITask).Assembly.GetName().Name); - clrDlls.Add(typeof(HelloTask).Assembly.GetName().Name); - clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); - clrDlls.Add(typeof(NameClient).Assembly.GetName().Name); - - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); - } - - private void SubmitNextTask(IActiveContext activeContext) - { - LOGGER.Log(Level.Info, "SubmitNextTask with evaluatorid: " + activeContext.EvaluatorId); - IConfiguration finalConfiguration = GetNextTaskConfiguration(); - if (null != finalConfiguration) - { - LOGGER.Log(Level.Info, "Executing task id " + _taskContext.CurrentTaskId()); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submitting Task {0}", _taskContext.CurrentTaskId())); - - activeContext.SubmitTask(finalConfiguration); - } - else - { - if (_taskContext.TaskCompleted == _taskContext.TotalTasks) - { - LOGGER.Log(Level.Info, "All tasks submitted and completed, active context remian idle"); - driveStatus = DriverStatus.Idle; - } - } - } - - private IConfiguration GetNextTaskConfiguration() - { - string nextTaskId = _taskContext.NextTaskId(); - LOGGER.Log(Level.Info, "GetNextTaskConfiguration, nextTaskId: " + nextTaskId); - if (nextTaskId != null) - { - IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, nextTaskId) - .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) - .Set(TaskConfiguration.OnMessage, GenericType<HelloTask.HelloDriverMessageHandler>.Class) - .Set(TaskConfiguration.OnSendMessage, GenericType<HelloTaskMessage>.Class) - .Build(); - return taskConfiguration; - } - return null; - } - } - - class TaskContext - { - private readonly IList<string> taskIds = new List<string>(); - - private readonly IDictionary<string, TaskStatus> tasks = new Dictionary<string, TaskStatus>(); - - public TaskContext() - { - NextTaskIndex = 0; - TaskCompleted = 0; - } - - public int TotalTasks { get; set; } - - public int NextTaskIndex { get; set; } - - public int TaskCompleted { get; set; } - - public string NextTaskId() - { - Console.WriteLine("NextTaskId: " + NextTaskIndex); - if (NextTaskIndex < TotalTasks) - { - string id = "Jan7DemoTask_" + DateTime.Now.Ticks; - taskIds.Add(id); - tasks.Add(id, TaskStatus.Submitting); - NextTaskIndex++; - return id; - } - return null; - } - - public string CurrentTaskId() - { - Console.WriteLine("CurrentTaskIndex: " + (NextTaskIndex - 1)); - if (NextTaskIndex <= TotalTasks) - { - Console.WriteLine("CurrentTaskId: " + taskIds[NextTaskIndex - 1]); - return taskIds[NextTaskIndex - 1]; - } - return null; //either not started or completed - } - - public void UpdateTaskStatus(string taskId, TaskStatus status) - { - tasks[taskId] = status; - } - - public string TaskStatusString() - { - Console.WriteLine("TaskStatusString 1, nextTaskIndex: " + NextTaskIndex); - StringBuilder sb = new StringBuilder(); - - if (tasks.Count > 0) - { - foreach (var pair in tasks) - { - sb.AppendLine("Task id: " + pair.Key + " Task status: " + pair.Value.ToString()); - } - } - else - { - sb.Append("No task is running yet"); - } - - return sb.ToString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloStartHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloStartHandler.cs deleted file mode 100644 index 0d8fc55..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloStartHandler.cs +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Examples.Tasks.HelloTask; -using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloStartHandler : IStartHandler - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HelloStartHandler)); - - [Inject] - public HelloStartHandler(HttpServerPort httpServerPort) - { - CreateClassHierarchy(); - Identifier = "HelloStartHandler"; - LOGGER.Log(Level.Info, "HttpPort received in HelloStartHandler: " + httpServerPort.PortNumber); - } - - public HelloStartHandler(string id) - { - Identifier = id; - CreateClassHierarchy(); - } - - public string Identifier { get; set; } - - private void CreateClassHierarchy() - { - HashSet<string> clrDlls = new HashSet<string>(); - clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(ITask).Assembly.GetName().Name); - clrDlls.Add(typeof(HelloTask).Assembly.GetName().Name); - clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); - clrDlls.Add(typeof(NameClient).Assembly.GetName().Name); - - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloTaskMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloTaskMessageHandler.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloTaskMessageHandler.cs deleted file mode 100644 index e6675ef..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/Handlers/HelloTaskMessageHandler.cs +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Globalization; -using System.Text; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge.Handlers -{ - public class HelloTaskMessageHandler : IObserver<ITaskMessage> - { - [Inject] - public HelloTaskMessageHandler() - { - } - - public void OnNext(ITaskMessage taskMessage) - { - Console.WriteLine(string.Format( - CultureInfo.InvariantCulture, - "CLR HelloTaskMessageHandler received following message from Task: {0}, Message: {1}.", - taskMessage.TaskId, - Encoding.UTF8.GetString(taskMessage.Message))); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/HelloTraceListener.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/HelloTraceListener.cs b/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/HelloTraceListener.cs deleted file mode 100644 index 9dc01b7..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/HelloCLRBridge/HelloTraceListener.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Diagnostics; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.HelloCLRBridge -{ - /// <summary> - /// This is a sample implemenation on how custom trace listner can be implemented - /// </summary> - public class HelloTraceListener : TraceListener - { - private readonly TraceListener _listener; - - [Inject] - public HelloTraceListener() - { - _listener = new ConsoleTraceListener(); - } - - public override void Write(string message) - { - _listener.Write("[helloTrace]" + message ); - } - - public override void WriteLine(string message) - { - _listener.WriteLine("[helloTrace]" + message); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj index 1557198..ee1c3f9 100644 --- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj +++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj @@ -35,23 +35,6 @@ under the License. <Reference Include="System.Runtime.Serialization" /> </ItemGroup> <ItemGroup> - <Compile Include="HelloCLRBridge\Handlers\AnotherHelloAllocatedEvaluatorHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloActiveContextHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloAllocatedEvaluatorHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloCompletedEvaluatorHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloDriverRestartActiveContextHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloDriverRestartRunningTaskHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloDriverStartHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloEvaluatorRequestorHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloFailedEvaluatorHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloFailedTaskHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloHttpHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloRestartHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloRunningTaskHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloSimpleEventHandlers.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloStartHandler.cs" /> - <Compile Include="HelloCLRBridge\Handlers\HelloTaskMessageHandler.cs" /> - <Compile Include="HelloCLRBridge\HelloTraceListener.cs" /> <Compile Include="MachineLearning\KMeans\Centroids.cs" /> <Compile Include="MachineLearning\KMeans\codecs\CentroidsCodec.cs" /> <Compile Include="MachineLearning\KMeans\codecs\DataVectorCodec.cs" /> @@ -105,6 +88,9 @@ under the License. <Name>Org.Apache.REEF.Wake</Name> </ProjectReference> </ItemGroup> + <ItemGroup> + <Folder Include="HelloCLRBridge\Handlers\" /> + </ItemGroup> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> <!-- To modify your build process, add your task inside one of the targets below and uncomment it. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7824be83/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs new file mode 100644 index 0000000..545c6fb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/HelloSimpleEventHandlers.cs @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Net; +using System.Text; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Examples.Tasks.HelloTask; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using IRunningTask = Org.Apache.REEF.Driver.Task.IRunningTask; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + enum DriverStatus + { + Init = 0, + Idle = 1, + RunningTasks = 2, + CompleteTasks = 3 + } + + internal enum TaskStatus + { + Submitting = 0, + Running = 1, + Completed = 2 + } + + /// <summary> + /// A demo class that contains basic handlers. It runs given tasks and is able to get request from http server and start to run the tasks again. + /// It handle various http requests. It also monitoring task status and driver status. + /// When IsRetain is configured as false. Context will be closed after executing all the tasks. + /// The default of IsRetain is true, that would retain the evaluators and wait for http requests to submit more jobs. + /// </summary> + public class HelloSimpleEventHandlers : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IRunningTask>, + IObserver<IFailedTask>, + IObserver<IFailedEvaluator>, + IObserver<ICompletedEvaluator>, + IHttpHandler + { + private const int NumberOfTasks = 5; + private static readonly Logger Logger = Logger.GetLogger(typeof(HelloSimpleEventHandlers)); + private readonly IList<IActiveContext> _activeContexts = new List<IActiveContext>(); + private DriverStatus _driveStatus; + private TaskContext _taskContext; + private readonly bool _isRetain; + private readonly int _numberOfEvaluators; + private readonly IEvaluatorRequestor _evaluatorRequestor; + + [Inject] + private HelloSimpleEventHandlers(IEvaluatorRequestor evaluatorRequestor, + [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, + [Parameter(typeof(IsRetain))] bool isRetain) + { + Logger.Log(Level.Info, "HelloSimpleEventHandlers constructor"); + _taskContext = new TaskContext(); + _taskContext.TotalTasks = NumberOfTasks; + _driveStatus = DriverStatus.Init; + _isRetain = isRetain; + _numberOfEvaluators = numberOfEvaluators; + _evaluatorRequestor = evaluatorRequestor; + } + + public static string ParsePathInfo(string pathInfo) + { + string[] p = pathInfo.Split('/'); + foreach (string s in p) + { + Logger.Log(Level.Info, s); + } + if (p.Length > 3) + { + return p[3]; + } + return null; + } + + public static void BuildHttpResponse( + ReefHttpResponse response, + HttpStatusCode httpStatusCode, + string strResponse) + { + response.Status = httpStatusCode; + response.OutputStream = ByteUtilities.StringToByteArrays(strResponse); + } + + public static void BuildHttpResponse( + ReefHttpResponse response, + HttpStatusCode httpStatusCode, + byte[] bytesResponse) + { + response.Status = httpStatusCode; + response.OutputStream = bytesResponse; + } + + public void OnNext(IDriverStarted driverStarted) + { + using (Logger.LogFunction("HelloSimpleEventHandlers::evalutorRequestor received")) + { + int evaluatorsNumber = _numberOfEvaluators; + int memory = 1024 * 3; + int cpuCoreCount = 1; + string rack = "WonderlandRack"; + string evaluatorBatchId = "evaluatorThatRequires3GBofMemory"; + EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, cpuCoreCount, rack, evaluatorBatchId); + + _evaluatorRequestor.Submit(request); + } + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + string taskId = "Task_" + allocatedEvaluator.Id; + var descriptor = allocatedEvaluator.GetEvaluatorDescriptor(); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator is assigned with {0} MB of memory and {1} cores.", descriptor.Memory, descriptor.VirtualCore)); + + using (Logger.LogFunction("HelloSimpleEventHandlers::allocatedEvaluator received {0}.", taskId)) + { + IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, "HelloSimpleEventHandlersContext_" + Guid.NewGuid().ToString("N")).Build(); + allocatedEvaluator.SubmitContext(contextConfiguration); + } + } + + public void OnNext(IActiveContext activeContext) + { + using (Logger.LogFunction("HelloSimpleEventHandlers::activeContext received")) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received activeContext, EvaluatorId id: {0}", activeContext.EvaluatorId)); + _activeContexts.Add(activeContext); + _driveStatus = DriverStatus.RunningTasks; + SubmitNextTask(activeContext); + } + } + + public void OnNext(ICompletedTask value) + { + using (Logger.LogFunction("HelloSimpleEventHandlers::CompletedTask received")) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received CompletedTask: {0}, task id: {1}", value.Id, _taskContext.CurrentTaskId())); + _taskContext.UpdateTaskStatus(value.Id, TaskStatus.Completed); + _taskContext.TaskCompleted++; + SubmitNextTask(value.ActiveContext); + } + } + + public void OnError(Exception error) + { + Logger.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Exception in coral handlers Msg: {1} Stack: {2}", error.Message, error.StackTrace)); + } + + public void OnCompleted() + { + } + + public void OnNext(IRunningTask value) + { + _taskContext.UpdateTaskStatus(_taskContext.CurrentTaskId(), TaskStatus.Running); + } + + public void OnNext(IFailedTask value) + { + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnNext(ICompletedEvaluator completedEvaluator) + { + string messageStr = string.Format( + CultureInfo.InvariantCulture, + "HelloSimpleEventHandlers: Evaluator [{0}] is done.", + completedEvaluator.Id); + Console.WriteLine(messageStr); + } + + public string GetSpecification() + { + return "crystal"; + } + + public void OnHttpRequest(ReefHttpRequest request, ReefHttpResponse response) + { + string target = ParsePathInfo(request.PathInfo); + Logger.Log(Level.Info, "Target: " + target + ". PathInfo: " + request.PathInfo); + if (target != null && target.Equals("driverstatus")) + { + Logger.Log(Level.Info, "Target: " + target + ". Driver status: " + _driveStatus.ToString()); + string msg = string.Format(CultureInfo.CurrentCulture, "Current Driver status: {0} ", _driveStatus.ToString()); + BuildHttpResponse(response, HttpStatusCode.OK, msg); + return; + } + + if (target != null && target.Equals("taskstatus")) + { + Logger.Log(Level.Info, "Target: " + target + ". TaskStatus string: " + _taskContext.TaskStatusString()); + BuildHttpResponse(response, HttpStatusCode.OK, _taskContext.TaskStatusString()); + return; + } + + if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.Init) + { + BuildHttpResponse(response, HttpStatusCode.OK, "Driver is not ready, wait a few second then send request again!!!"); + return; + } + + if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.RunningTasks) + { + string msg = string.Format(CultureInfo.CurrentCulture, + "A job is running. Please check driver status and then submit your job again."); + BuildHttpResponse(response, HttpStatusCode.OK, msg); + return; + } + + if (target != null && target.ToLower(CultureInfo.CurrentCulture).Equals("run") && _driveStatus == DriverStatus.Idle) + { + string numberOfTasks = GetQueryValue(request.Querystring, "numberoftasks"); + if (numberOfTasks == null) + { + BuildHttpResponse(response, HttpStatusCode.OK, "Please specify number of tasks to run"); + return; + } + + _driveStatus = DriverStatus.RunningTasks; + using (Logger.LogFunction("HelloSimpleEventHandlers::Processing a new Job from web request")) + { + _taskContext = new TaskContext(); + _taskContext.TotalTasks = int.Parse(numberOfTasks, CultureInfo.CurrentCulture); + BuildHttpResponse(response, HttpStatusCode.OK, "Job from web request is submitted and is running!!!"); + } + + foreach (var c in _activeContexts) + { + SubmitNextTask(c); + } + return; + } + BuildHttpResponse(response, HttpStatusCode.OK, "Unsupported query"); + } + + private static IDictionary<string, string> ParseQueryString(string queryString) + { + IDictionary<string, string> queryPairs = new Dictionary<string, string>(); + if (!string.IsNullOrEmpty(queryString)) + { + string[] queries = queryString.Split('&'); + foreach (string query in queries) + { + string[] pairs = query.Split('='); + if (pairs.Length == 2 && !pairs[0].Equals(string.Empty) && !pairs[1].Equals(string.Empty)) + { + queryPairs[pairs[0]] = pairs[1]; + Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "query key: {0}, Query value: {1}.", pairs[0], pairs[1])); + } + } + } + return queryPairs; + } + + private static string GetQueryValue(string queryString, string name) + { + IDictionary<string, string> pairs = ParseQueryString(queryString); + string v; + pairs.TryGetValue(name, out v); + return v; + } + + private void SubmitNextTask(IActiveContext activeContext) + { + Logger.Log(Level.Info, "SubmitNextTask with evaluator id: " + activeContext.EvaluatorId); + IConfiguration finalConfiguration = GetNextTaskConfiguration(); + if (null != finalConfiguration) + { + Logger.Log(Level.Info, "Executing task id " + _taskContext.CurrentTaskId()); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submitting Task {0}", _taskContext.CurrentTaskId())); + + activeContext.SubmitTask(finalConfiguration); + } + else + { + if (_taskContext.TaskCompleted == _taskContext.TotalTasks) + { + Logger.Log(Level.Info, "All tasks submitted and completed, active context remains idle"); + _driveStatus = DriverStatus.Idle; + if (!_isRetain) + { + activeContext.Dispose(); + } + } + } + } + + private IConfiguration GetNextTaskConfiguration() + { + string nextTaskId = _taskContext.NextTaskId(); + Logger.Log(Level.Info, "GetNextTaskConfiguration, nextTaskId: " + nextTaskId); + if (nextTaskId != null) + { + IConfiguration taskConfiguration = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, nextTaskId) + .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) + .Set(TaskConfiguration.OnMessage, GenericType<HelloTask.HelloDriverMessageHandler>.Class) + .Set(TaskConfiguration.OnSendMessage, GenericType<HelloTaskMessage>.Class) + .Build(); + return taskConfiguration; + } + return null; + } + } + + class TaskContext + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskContext)); + private readonly IList<string> _taskIds = new List<string>(); + private readonly IDictionary<string, TaskStatus> _tasks = new Dictionary<string, TaskStatus>(); + + public TaskContext() + { + NextTaskIndex = 0; + TaskCompleted = 0; + } + + public int TotalTasks { get; set; } + + public int NextTaskIndex { get; set; } + + public int TaskCompleted { get; set; } + + public string NextTaskId() + { + Logger.Log(Level.Verbose, "NextTaskId: " + NextTaskIndex); + if (NextTaskIndex < TotalTasks) + { + string id = "Jan7DemoTask_" + DateTime.Now.Ticks; + _taskIds.Add(id); + _tasks.Add(id, TaskStatus.Submitting); + NextTaskIndex++; + return id; + } + return null; + } + + public string CurrentTaskId() + { + Logger.Log(Level.Verbose, "CurrentTaskIndex: " + (NextTaskIndex - 1)); + if (NextTaskIndex <= TotalTasks) + { + Logger.Log(Level.Verbose, "CurrentTaskId: " + _taskIds[NextTaskIndex - 1]); + return _taskIds[NextTaskIndex - 1]; + } + return null; //either not started or completed + } + + public void UpdateTaskStatus(string taskId, TaskStatus status) + { + _tasks[taskId] = status; + } + + public string TaskStatusString() + { + Logger.Log(Level.Verbose, "TaskStatusString 1, nextTaskIndex: " + NextTaskIndex); + StringBuilder sb = new StringBuilder(); + + if (_tasks.Count > 0) + { + foreach (var pair in _tasks) + { + sb.AppendLine("Task id: " + pair.Key + " Task status: " + pair.Value.ToString()); + } + } + else + { + sb.Append("No task is running yet"); + } + + return sb.ToString(); + } + } + + [NamedParameter(Documentation = "NumberOfTasks", ShortName = "NumberOfTasks", DefaultValue = "2")] + class NumberOfEvaluators : Name<Int32> + { + } + + [NamedParameter(Documentation = "Retain", ShortName = "Retain", DefaultValue = "true")] + class IsRetain : Name<bool> + { + } +}
