http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs new file mode 100644 index 0000000..3de6e22 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; + +namespace Org.Apache.REEF.Utilities.Logging +{ + public class Logger + { + private static readonly string[] LogLevel = new string[] + { + "OFF", + "ERROR", + "WARNING", + "START", + "EXIT", + "INFO", + "VERBOSE" + }; + + private static readonly Dictionary<Level, TraceEventType> EventTypes + = new Dictionary<Level, TraceEventType>() + { + { Level.Off, TraceEventType.Stop }, + { Level.Error, TraceEventType.Error }, + { Level.Warning, TraceEventType.Warning }, + { Level.Start, TraceEventType.Start }, + { Level.Stop, TraceEventType.Stop }, + { Level.Info, TraceEventType.Information }, + { Level.Verbose, TraceEventType.Verbose }, + }; + + private static Level _customLevel = Level.Verbose; + + private static List<TraceListener> _traceListeners; + + private string _name; + + private TraceSource _traceSource; + + private Logger(string name) + { + _name = name; + _traceSource = new TraceSource(_name, SourceLevels.All); + CustcomLevel = _customLevel; + if (TraceListeners.Count == 0) + { + // before customized listener is added, we would need to log to console + _traceSource.Listeners.Add(new ConsoleTraceListener()); + } + else + { + _traceSource.Listeners.Clear(); + foreach (TraceListener listener in TraceListeners) + { + _traceSource.Listeners.Add(listener); + } + } + } + + public static Level CustcomLevel + { + get + { + return _customLevel; + } + + set + { + _customLevel = value; + } + } + + public static List<TraceListener> TraceListeners + { + get + { + if (_traceListeners == null) + { + _traceListeners = new List<TraceListener>(); + } + return _traceListeners; + } + } + + public static void SetCustomLevel(Level customLevel) + { + _customLevel = customLevel; + } + + public static void AddTraceListner(TraceListener listener) + { + TraceListeners.Add(listener); + } + + public static Logger GetLogger(Type type) + { + return GetLogger(type.FullName); + } + + public static Logger GetLogger(string name) + { + return new Logger(name); + } + + /// <summary> + /// Log the message with the specified Log Level. + /// + /// If addtional arguments are passed, the message will be treated as + /// a format string. The format string and the additional arguments + /// will be formatted according to string.Format() + /// </summary> + /// <param name="level"></param> + /// <param name="formatStr"></param> + /// <param name="args"></param> + public void Log(Level level, string formatStr, params object[] args) + { + if (CustcomLevel >= level) + { + string msg = FormatMessage(formatStr, args); + string logMessage = + DateTime.Now.ToString("o", CultureInfo.InvariantCulture) + + " " + + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture) + + Environment.NewLine + LogLevel[(int)level] + ": " + + msg; + + _traceSource.TraceEvent( + EventTypes[level], + 0, // we don't use event id for now, but this can be useful for e2e logging later + logMessage); + } + } + + public void Log(Level level, string msg, Exception exception) + { + string exceptionLog = string.Empty; + if (exception != null) + { + exceptionLog = string.Format( + CultureInfo.InvariantCulture, + "encountered error [{0}] with mesage [{1}] and stack trace [{2}]", + exception, + exception.Message, + exception.StackTrace); + } + Log(level, msg + exceptionLog); + } + + public IDisposable LogFunction(string function, params object[] args) + { + return LogScope(function, args); + } + + public IDisposable LogScope(string format, params object[] args) + { + return new LoggingScope(this, DateTime.Now + " " + format, args); + } + + private string FormatMessage(string formatStr, params object[] args) + { + return args.Length > 0 ? string.Format(CultureInfo.CurrentCulture, formatStr, args) : formatStr; + } + + /// <summary> + /// Represents a logging scope. + /// </summary> + /// <remarks> + /// A start log is written when an instance is created + /// and a stop trace is written when the instance is disposed. + /// </remarks> + private sealed class LoggingScope : IDisposable + { + private readonly Stopwatch _stopWatch; + + private readonly Logger _logger; + + private readonly string _content; + + /// <summary> + /// Initializes a new instance of the LoggingScope class. + /// </summary> + /// <param name="logger"></param> + /// <param name="format"></param> + /// <param name="args"></param> + public LoggingScope(Logger logger, string format, params object[] args) + { + _logger = logger; + + _stopWatch = Stopwatch.StartNew(); + + string content = args.Length > 0 ? string.Format(CultureInfo.InvariantCulture, format, args) : format; + _content = content; + + _logger.Log(Level.Start, content); + } + + /// <summary> + /// Logs the end of a scope. + /// </summary> + public void Dispose() + { + _logger.Log(Level.Stop, string.Format(CultureInfo.InvariantCulture, "{0}. Duration: [{1}].", _content, _stopWatch.Elapsed)); + _stopWatch.Stop(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs new file mode 100644 index 0000000..7c9a92c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Net; + +namespace Org.Apache.REEF.Utilities +{ + public class NetUtilities + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(NetUtilities)); + + public static IPEndPoint ParseIpEndpoint(string ipWithPort) + { + string ip = ipWithPort.TrimStart().TrimEnd(); + if (char.IsDigit(ip[0])) + { + ip = @"socket://" + ip; + } + Uri uri = new Uri(ip); + string driverAddress = uri.Host; + int driverCommunicationPort = uri.Port; + IPAddress ipAddress; + IPAddress.TryParse(driverAddress, out ipAddress); + if (ipAddress == null) + { + Exceptions.Throw(new FormatException("invalid format for ip: " + ipWithPort), LOGGER); + } + + return new IPEndPoint(ipAddress, driverCommunicationPort); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Optional.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Optional.cs b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs new file mode 100644 index 0000000..11d95fa --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Utilities +{ + [Serializable] + public sealed class Optional<T> + { + private readonly T _value; + + private Optional(T value) + { + _value = value; + } + + private Optional() + { + _value = default(T); + } + + public T Value + { + get { return _value; } + } + + public static Optional<T> Of(T value) + { + if (value == null) + { + Diagnostics.Exceptions.Throw(new ArgumentNullException("value", "Passed a null value. Use OfNullable() instead"), Logger.GetLogger(typeof(Optional<T>))); + } + return new Optional<T>(value); + } + + public static Optional<T> Empty() + { + return new Optional<T>(); + } + + public static Optional<T> OfNullable(T value) + { + if (value == null) + { + return Empty(); + } + else + { + return Of(value); + } + } + + public T OrElse(T other) + { + if (IsPresent()) + { + return Value; + } + else + { + return other; + } + } + + public bool IsPresent() + { + return (_value != null); + } + + public override bool Equals(object obj) + { + if (this == obj) + { + return true; + } + if (obj == null || obj.GetType() != this.GetType()) + { + return false; + } + Optional<T> optional = (Optional<T>)obj; + if (_value != null ? !_value.Equals(optional.Value) : optional.Value != null) + { + return false; + } + return true; + } + + public override int GetHashCode() + { + return _value != null ? _value.GetHashCode() : 0; + } + + public override string ToString() + { + return "Optional{value=" + _value + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj new file mode 100644 index 0000000..27899de --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Utilities</RootNamespace> + <AssemblyName>Org.Apache.REEF.Utilities</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <RestorePackages>true</RestorePackages> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> + </PropertyGroup> + <Import Project="$(SolutionDir)\Source\build.props" /> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>AnyCPU</PlatformTarget> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>AnyCPU</PlatformTarget> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Microsoft.Hadoop.Avro"> + <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath> + </Reference> + <Reference Include="Newtonsoft.Json"> + <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="AvroUtils.cs" /> + <Compile Include="ByteUtilities.cs" /> + <Compile Include="Diagnostics\DiagnosticsMessages.cs" /> + <Compile Include="Diagnostics\Exceptions.cs" /> + <Compile Include="IIdentifiable.cs" /> + <Compile Include="IMessage.cs" /> + <Compile Include="Logging\JavaLoggingSetting.cs" /> + <Compile Include="Logging\Level.cs" /> + <Compile Include="Logging\Logger.cs" /> + <Compile Include="NetUtilities.cs" /> + <Compile Include="Optional.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="ValidationUtilities.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..5401a32 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Utilities")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Utilities")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("a7bda51a-552a-4fba-a834-f715c19454ab")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs new file mode 100644 index 0000000..80507fc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Utilities +{ + public class ValidationUtilities + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ValidationUtilities)); + + public static string ValidateEnvVariable(string env) + { + string envVariable = Environment.GetEnvironmentVariable(env); + if (string.IsNullOrWhiteSpace(envVariable)) + { + Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} not set. Please set the environment variable first. Exiting...", env)); + string msg = string.Format(CultureInfo.InvariantCulture, "No {0} found.", env); + Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), msg, LOGGER); + } + return envVariable; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/packages.config b/lang/cs/Org.Apache.REEF.Utilities/packages.config new file mode 100644 index 0000000..c60eef8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/packages.config @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> + <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" /> + <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" /> +</packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs new file mode 100644 index 0000000..3af063f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Runtime; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + public class ClockTest + { + [TestMethod] + public void TestClock() + { + using (RuntimeClock clock = BuildClock()) + { + Task.Run(new Action(clock.Run)); + + var heartBeat = new HeartbeatObserver(clock); + heartBeat.OnNext(null); + Thread.Sleep(5000); + + Assert.AreEqual(100, heartBeat.EventCount); + } + } + + [TestMethod] + public void TestAlarmRegistrationRaceConditions() + { + using (RuntimeClock clock = BuildClock()) + { + Task.Run(new Action(clock.Run)); + + List<Alarm> events1 = new List<Alarm>(); + List<Alarm> events2 = new List<Alarm>(); + + // Observers to record events that they have processed + IObserver<Alarm> earlierRecorder = Observer.Create<Alarm>(events1.Add); + IObserver<Alarm> laterRecorder = Observer.Create<Alarm>(events2.Add); + + // Schedule a later alarm in the future + clock.ScheduleAlarm(5000, laterRecorder); + + // After 1 second, schedule an earlier alarm that will fire before the later alarm + Thread.Sleep(1000); + clock.ScheduleAlarm(2000, earlierRecorder); + + // The earlier alarm should not have fired after 1 second + Thread.Sleep(1000); + Assert.AreEqual(0, events1.Count); + + // The earlier alarm will have fired after another 1.5 seconds, but the later will have not + Thread.Sleep(1500); + Assert.AreEqual(1, events1.Count); + Assert.AreEqual(0, events2.Count); + + // The later alarm will have fired after 2 seconds + Thread.Sleep(2000); + Assert.AreEqual(1, events1.Count); + } + } + + [TestMethod] + public void TestSimulatenousAlarms() + { + using (RuntimeClock clock = BuildClock()) + { + Task.Run(new Action(clock.Run)); + + List<Alarm> events = new List<Alarm>(); + IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(events.Add); + + clock.ScheduleAlarm(1000, eventRecorder); + clock.ScheduleAlarm(1000, eventRecorder); + clock.ScheduleAlarm(1000, eventRecorder); + + Thread.Sleep(1500); + Assert.AreEqual(3, events.Count); + } + } + + [TestMethod] + public void TestAlarmOrder() + { + using (RuntimeClock clock = BuildLogicalClock()) + { + Task.Run(new Action(clock.Run)); + + // Event handler to record event time stamps + List<long> recordedTimestamps = new List<long>(); + IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(alarm => recordedTimestamps.Add(alarm.TimeStamp)); + + // Schedule 10 alarms every 100 ms + List<long> expectedTimestamps = Enumerable.Range(0, 10).Select(offset => (long)offset * 100).ToList(); + expectedTimestamps.ForEach(offset => clock.ScheduleAlarm(offset, eventRecorder)); + + // Check that the recorded timestamps are in the same order that they were scheduled + Thread.Sleep(1500); + Assert.IsTrue(expectedTimestamps.SequenceEqual(recordedTimestamps)); + } + } + + private RuntimeClock BuildClock() + { + var builder = TangFactory.GetTang().NewConfigurationBuilder(); + + return TangFactory.GetTang() + .NewInjector(builder.Build()) + .GetInstance<RuntimeClock>(); + } + + private RuntimeClock BuildLogicalClock() + { + var builder = TangFactory.GetTang().NewConfigurationBuilder(); + builder.BindImplementation(GenericType<ITimer>.Class, GenericType<LogicalTimer>.Class); + + return TangFactory.GetTang() + .NewInjector(builder.Build()) + .GetInstance<RuntimeClock>(); + } + + private class HeartbeatObserver : IObserver<Alarm> + { + private RuntimeClock _clock; + + public HeartbeatObserver(RuntimeClock clock) + { + _clock = clock; + EventCount = 0; + } + + public int EventCount { get; set; } + + public void OnNext(Alarm value) + { + EventCount++; + if (EventCount < 100) + { + _clock.ScheduleAlarm(10, this); + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs new file mode 100644 index 0000000..61e8ebd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Text; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + public class MultiCodecTest + { + [TestMethod] + public void TestMultiCodec() + { + MultiCodec<BaseEvent> codec = new MultiCodec<BaseEvent>(); + codec.Register(new Event1Codec()); + codec.Register(new Event2Codec()); + + byte[] d1Data = codec.Encode(new Event1(42)); + byte[] d2Data = codec.Encode(new Event2("Tony")); + + Event1 e1 = (Event1)codec.Decode(d1Data); + Event2 e2 = (Event2)codec.Decode(d2Data); + + Assert.AreEqual(42, e1.Number); + Assert.AreEqual("Tony", e2.Name); + } + + private class BaseEvent + { + } + + private class Event1 : BaseEvent + { + public Event1(int number) + { + Number = number; + } + + public int Number { get; set; } + } + + private class Event2 : BaseEvent + { + public Event2(string name) + { + Name = name; + } + + public string Name { get; set; } + } + + private class Event1Codec : ICodec<Event1> + { + public byte[] Encode(Event1 obj) + { + return BitConverter.GetBytes(obj.Number); + } + + public Event1 Decode(byte[] data) + { + return new Event1(BitConverter.ToInt32(data, 0)); + } + } + + private class Event2Codec : ICodec<Event2> + { + public byte[] Encode(Event2 obj) + { + return Encoding.ASCII.GetBytes(obj.Name); + } + + public Event2 Decode(byte[] data) + { + return new Event2(Encoding.ASCII.GetString(data)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj new file mode 100644 index 0000000..3ae017d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{214C64C6-04E5-4867-B69A-E3502EA50871}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Wake.Tests</RootNamespace> + <AssemblyName>Org.Apache.REEF.Wake.Tests</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> + <RestorePackages>true</RestorePackages> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" /> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Reactive.Core"> + <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Interfaces"> + <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="ClockTest.cs" /> + <Compile Include="MultiCodecTest.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="PubSubSubjectTest.cs" /> + <Compile Include="RemoteManagerTest.cs" /> + <Compile Include="TransportTest.cs" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="..\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <ItemGroup> + <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..47e58c2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Wake.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Wake.Tests")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("68a2ef80-e51b-4abb-9ccc-81354e152758")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs new file mode 100644 index 0000000..7ce56d3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Reactive; +using System.Threading; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Wake.RX.Impl; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + public class PubSubSubjectTest + { + [TestMethod] + public void TestPubSubSubjectSingleThread() + { + int sum = 0; + + // Observer that adds sum of numbers up to and including x + PubSubSubject<int> subject = new PubSubSubject<int>(); + subject.Subscribe(Observer.Create<int>( + x => + { + for (int i = 0; i <= x; i++) + { + sum += i; + } + })); + + subject.OnNext(10); + subject.OnCompleted(); + Assert.AreEqual(sum, 55); + } + + [TestMethod] + public void TestPubSubSubjectMultipleThreads() + { + int sum = 0; + + PubSubSubject<int> subject = new PubSubSubject<int>(); + subject.Subscribe(Observer.Create<int>(x => sum += x)); + + Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.Length; i++) + { + threads[i] = new Thread(() => + { + for (int j = 0; j < 10000; j++) + { + subject.OnNext(1); + } + }); + + threads[i].Start(); + } + + foreach (Thread thread in threads) + { + thread.Join(); + } + + Assert.AreEqual(sum, 100000); + } + + [TestMethod] + public void TestMultipleTypes() + { + int sum1 = 0; + int sum2 = 0; + + PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>(); + subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100)); + subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500)); + + subject.OnNext(new SubEvent1()); + subject.OnNext(new SubEvent2()); + subject.OnNext(new SubEvent2()); + + Assert.AreEqual(sum1, 100); + Assert.AreEqual(sum2, 1000); + } + + [TestMethod] + public void TestOnCompleted() + { + int sum = 0; + + PubSubSubject<int> subject = new PubSubSubject<int>(); + subject.Subscribe(Observer.Create<int>(x => sum += x)); + + subject.OnNext(10); + Assert.AreEqual(10, sum); + + subject.OnNext(10); + Assert.AreEqual(20, sum); + + // Check that after calling OnCompleted, OnNext will do nothing + subject.OnCompleted(); + subject.OnNext(10); + Assert.AreEqual(20, sum); + } + + [TestMethod] + public void TestOnError() + { + int sum = 0; + + PubSubSubject<int> subject = new PubSubSubject<int>(); + subject.Subscribe(Observer.Create<int>(x => sum += x)); + + subject.OnNext(10); + Assert.AreEqual(10, sum); + + subject.OnNext(10); + Assert.AreEqual(20, sum); + + // Check that after calling OnError, OnNext will do nothing + subject.OnError(new Exception("error")); + subject.OnNext(10); + Assert.AreEqual(20, sum); + } + + [TestMethod] + public void TestDisposeSingleSubject() + { + int sum = 0; + + PubSubSubject<int> subject = new PubSubSubject<int>(); + var disposable = subject.Subscribe(Observer.Create<int>(x => sum += x)); + + subject.OnNext(10); + subject.OnNext(10); + subject.OnNext(10); + Assert.AreEqual(30, sum); + + // Unregister the subject and check that calling OnNext does nothing + disposable.Dispose(); + subject.OnNext(10); + Assert.AreEqual(30, sum); + } + + [TestMethod] + public void TestDisposeMultipleSubjects() + { + int sum1 = 0; + int sum2 = 0; + + SubEvent1 event1 = new SubEvent1(); + SubEvent2 event2 = new SubEvent2(); + + PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>(); + var disposable1 = subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100)); + var disposable2 = subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500)); + + subject.OnNext(event1); + subject.OnNext(event2); + subject.OnNext(event2); + Assert.AreEqual(sum1, 100); + Assert.AreEqual(sum2, 1000); + + // Check that unsubscribing from SubEvent1 does not affect other subscriptions + disposable1.Dispose(); + subject.OnNext(event1); + subject.OnNext(event2); + Assert.AreEqual(sum1, 100); + Assert.AreEqual(sum2, 1500); + + // Unsubscribe from the remaining event types + disposable2.Dispose(); + subject.OnNext(event1); + subject.OnNext(event2); + Assert.AreEqual(sum1, 100); + Assert.AreEqual(sum2, 1500); + } + + class SuperEvent + { + } + + class SubEvent1 : SuperEvent + { + } + + class SubEvent2 : SuperEvent + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs new file mode 100644 index 0000000..3b3ac6d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Net; +using System.Reactive; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + public class RemoteManagerTest + { + [TestMethod] + public void TestOneWayCommunication() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + var observer = Observer.Create<string>(queue.Add); + IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); + remoteManager2.RegisterObserver(endpoint1, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + remoteObserver.OnNext("ghi"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestOneWayCommunicationClientOnly() + { + int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000); + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, listeningPort, new StringCodec())) + { + IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer = Observer.Create<string>(queue.Add); + remoteManager2.RegisterObserver(remoteEndpoint, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + remoteObserver.OnNext("ghi"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestTwoWayCommunication() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue1 = new BlockingCollection<string>(); + BlockingCollection<string> queue2 = new BlockingCollection<string>(); + List<string> events1 = new List<string>(); + List<string> events2 = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + // Register observers for remote manager 1 and remote manager 2 + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer1 = Observer.Create<string>(queue1.Add); + var observer2 = Observer.Create<string>(queue2.Add); + remoteManager1.RegisterObserver(remoteEndpoint, observer1); + remoteManager2.RegisterObserver(remoteEndpoint, observer2); + + // Remote manager 1 sends 3 events to remote manager 2 + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("def"); + remoteObserver1.OnNext("ghi"); + + // Remote manager 2 sends 4 events to remote manager 1 + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); + remoteObserver2.OnNext("jkl"); + remoteObserver2.OnNext("mno"); + remoteObserver2.OnNext("pqr"); + remoteObserver2.OnNext("stu"); + + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + } + + Assert.AreEqual(4, events1.Count); + Assert.AreEqual(3, events2.Count); + } + + [TestMethod] + public void TestCommunicationThreeNodesOneWay() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer = Observer.Create<string>(queue.Add); + remoteManager3.RegisterObserver(remoteEndpoint, observer); + + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); + + remoteObserver2.OnNext("abc"); + remoteObserver1.OnNext("def"); + remoteObserver2.OnNext("ghi"); + remoteObserver1.OnNext("jkl"); + remoteObserver2.OnNext("mno"); + + for (int i = 0; i < 5; i++) + { + events.Add(queue.Take()); + } + } + + Assert.AreEqual(5, events.Count); + } + + [TestMethod] + public void TestCommunicationThreeNodesBothWays() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue1 = new BlockingCollection<string>(); + BlockingCollection<string> queue2 = new BlockingCollection<string>(); + BlockingCollection<string> queue3 = new BlockingCollection<string>(); + List<string> events1 = new List<string>(); + List<string> events2 = new List<string>(); + List<string> events3 = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + + var observer = Observer.Create<string>(queue1.Add); + remoteManager1.RegisterObserver(remoteEndpoint, observer); + var observer2 = Observer.Create<string>(queue2.Add); + remoteManager2.RegisterObserver(remoteEndpoint, observer2); + var observer3 = Observer.Create<string>(queue3.Add); + remoteManager3.RegisterObserver(remoteEndpoint, observer3); + + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); + + // Observer 1 and 2 send messages to observer 3 + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("abc"); + remoteObserver2.OnNext("def"); + remoteObserver2.OnNext("def"); + + // Observer 3 sends messages back to observers 1 and 2 + var remoteObserver3a = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint); + var remoteObserver3b = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint); + + remoteObserver3a.OnNext("ghi"); + remoteObserver3a.OnNext("ghi"); + remoteObserver3b.OnNext("jkl"); + remoteObserver3b.OnNext("jkl"); + remoteObserver3b.OnNext("jkl"); + + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + } + + Assert.AreEqual(2, events1.Count); + Assert.AreEqual(3, events2.Count); + Assert.AreEqual(5, events3.Count); + } + + [TestMethod] + public void TestRemoteSenderCallback() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + // Register handler for when remote manager 2 receives events; respond + // with an ack + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); + + var receiverObserver = Observer.Create<string>( + message => remoteObserver2.OnNext("received message: " + message)); + remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver); + + // Register handler for remote manager 1 to record the ack + var senderObserver = Observer.Create<string>(queue.Add); + remoteManager1.RegisterObserver(remoteEndpoint, senderObserver); + + // Begin to send messages + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver1.OnNext("hello"); + remoteObserver1.OnNext("there"); + remoteObserver1.OnNext("buddy"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + + Assert.AreEqual(3, events.Count); + Assert.AreEqual("received message: hello", events[0]); + Assert.AreEqual("received message: there", events[1]); + Assert.AreEqual("received message: buddy", events[2]); + } + + [TestMethod] + public void TestRegisterObserverByType() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + // RemoteManager2 listens and records events of type IRemoteEvent<string> + var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message)); + remoteManager2.RegisterObserver(observer); + + // Remote manager 1 sends 3 events to remote manager 2 + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + remoteObserver.OnNext("ghi"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestCachedConnection() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec())) + { + var observer = Observer.Create<string>(queue.Add); + IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); + remoteManager2.RegisterObserver(endpoint1, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + + var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + cachedObserver.OnNext("ghi"); + cachedObserver.OnNext("jkl"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + + Assert.AreEqual(4, events.Count); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs new file mode 100644 index 0000000..3e67e0d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Net; +using System.Reactive; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + public class TransportTest + { + [TestMethod] + public void TestTransportServer() + { + ICodec<string> codec = new StringCodec(); + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new TransportServer<string>(endpoint, remoteHandler, codec)) + { + server.Run(); + + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new TransportClient<string>(remoteEndpoint, codec)) + { + client.Send("Hello"); + client.Send(", "); + client.Send("World!"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestTransportServerEvent() + { + ICodec<TestEvent> codec = new TestEventCodec(); + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + + BlockingCollection<TestEvent> queue = new BlockingCollection<TestEvent>(); + List<TestEvent> events = new List<TestEvent>(); + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + var remoteHandler = Observer.Create<TransportEvent<TestEvent>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec)) + { + server.Run(); + + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec)) + { + client.Send(new TestEvent("Hello")); + client.Send(new TestEvent(", ")); + client.Send(new TestEvent("World!")); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestTransportSenderStage() + { + ICodec<string> codec = new StringCodec(); + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + + List<string> events = new List<string>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); + + // Server echoes the message back to the client + var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data)); + + using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec)) + { + server.Run(); + + var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler)) + { + client.Send("Hello"); + client.Send(", "); + client.Send(" World"); + + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + } + } + + Assert.AreEqual(3, events.Count); + } + + [TestMethod] + public void TestRaceCondition() + { + ICodec<string> codec = new StringCodec(); + int port = NetworkUtils.GenerateRandomPort(6000, 7000); + + BlockingCollection<string> queue = new BlockingCollection<string>(); + List<string> events = new List<string>(); + int numEventsExpected = 150; + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new TransportServer<string>(endpoint, remoteHandler, codec)) + { + server.Run(); + + for (int i = 0; i < numEventsExpected / 3; i++) + { + Task.Run(() => + { + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + using (var client = new TransportClient<string>(remoteEndpoint, codec)) + { + client.Send("Hello"); + client.Send(", "); + client.Send("World!"); + } + }); + } + + for (int i = 0; i < numEventsExpected; i++) + { + events.Add(queue.Take()); + } + } + + Assert.AreEqual(numEventsExpected, events.Count); + } + + private class TestEvent + { + public TestEvent(string message) + { + Message = message; + } + + public string Message { get; set; } + + public override string ToString() + { + return "TestEvent: " + Message; + } + } + + private class TestEventCodec : ICodec<TestEvent> + { + public byte[] Encode(TestEvent obj) + { + return new StringCodec().Encode(obj.Message); + } + + public TestEvent Decode(byte[] data) + { + return new TestEvent(new StringCodec().Decode(data)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config new file mode 100644 index 0000000..75e5b34 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> + <package id="Rx-Core" version="2.2.5" targetFramework="net45" /> + <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" /> +</packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs new file mode 100644 index 0000000..ec78c82 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake +{ + /// <summary> + /// An estage that implements metering + /// </summary> + /// <typeparam name="T">The estage type</typeparam> + public abstract class AbstractEStage<T> : IEStage<T> + { + /// <summary>Constructs an abstract estage</summary> + /// <param name="meterName">the meter name</param> + protected AbstractEStage(string meterName) + { + } + + /// <summary>Updates the meter</summary> + /// <param name="value">an event</param> + public virtual void OnNext(T value) + { + } + + public abstract void Dispose(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IEStage.cs b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs new file mode 100644 index 0000000..d9e91b6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake +{ + /// <summary>Stage that executes an event handler</summary> + public interface IEStage<T> : IEventHandler<T>, IStage + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs new file mode 100644 index 0000000..6ee267d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake +{ + /// <summary> + /// Handler to process an event + /// </summary> + /// <typeparam name="T">The type of event</typeparam> + public interface IEventHandler<T> + { + /// <summary> + /// Process an event + /// </summary> + /// <param name="value">The event to process</param> + void OnNext(T value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs new file mode 100644 index 0000000..3ccaf1f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake +{ + /// <summary> + /// Identifier class for REEF. + /// + /// Identifiers are a generic naming primitive that carry some information about + /// the type of object that they point to. + /// + /// Examples include remote sockets or filenames. + /// </summary> + public abstract class IIdentifier + { + /// <summary> + /// Returns a hash code for the object + /// </summary> + /// <returns>The hash code value for this object</returns> + public abstract override int GetHashCode(); + + /// <summary> + /// Checks that another object is equal to this object + /// </summary> + /// <param name="o">The object to compare</param> + /// <returns>True if the object is the same as the object argument; false, otherwise</returns> + public abstract override bool Equals(object o); + + /// <summary> + /// Returns a string representation of this object + /// </summary> + /// <returns>A string representation of this object</returns> + public abstract override string ToString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs new file mode 100644 index 0000000..9e781f3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote.Impl; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake +{ + [DefaultImplementation(typeof(StringIdentifierFactory))] + public interface IIdentifierFactory + { + IIdentifier Create(string s); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs new file mode 100644 index 0000000..d43e4b3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake +{ + public interface IObserverFactory + { + object Create(); + } +}
