http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/IStage.cs b/lang/cs/Org.Apache.REEF.Wake/IStage.cs new file mode 100644 index 0000000..d7121a8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/IStage.cs @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake +{ + /// <summary>Stage is an execution unit for events and provides a way to reclaim its resources + /// </summary> + public interface IStage : IDisposable + { + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs new file mode 100644 index 0000000..cd2bff5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using System; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>A logging event handler</summary> + public class LoggingEventHandler<T> : IObserver<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(T)); + + [Inject] + public LoggingEventHandler() + { + } + + /// <summary>Logs the event</summary> + /// <param name="value">an event</param> + public void OnNext(T value) + { + LOGGER.Log(Level.Verbose, "Event: " + DateTime.Now + value); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs new file mode 100644 index 0000000..324eb61 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time; + +namespace Org.Apache.REEF.Wake.Impl +{ + public class MissingStartHandlerHandler : IObserver<StartTime> + { + [Inject] + public MissingStartHandlerHandler() + { + } + + public void OnNext(StartTime value) + { + // Do nothing, since we only use this for evaluator, not for driver. + // LOGGER.Log(Level.Info, "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs new file mode 100644 index 0000000..b157c75 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Event handler that dispatches an event to a specific handler based on an event class type + /// </summary> + public class MultiEventHandler<T> : IEventHandler<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiEventHandler<T>)); + private readonly IDictionary<Type, IEventHandler<T>> _map; + + /// <summary>Constructs a multi-event handler</summary> + /// <param name="map">a map of class types to event handlers</param> + public MultiEventHandler(IDictionary<Type, IEventHandler<T>> map) + { + foreach (Type item in map.Keys) + { + if (!typeof(T).IsAssignableFrom(item)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException(typeof(T) + " is not assignable from " + item), LOGGER); + } + } + _map = map; + } + + /// <summary> + /// Invokes a specific handler for the event class type if it exists + /// </summary> + /// <param name="value">The event to handle</param> + public void OnNext(T value) + { + IEventHandler<T> handler = null; + bool success = _map.TryGetValue(value.GetType(), out handler); + if (success) + { + handler.OnNext(value); + } + else + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("No event " + value.GetType() + " handler"), LOGGER); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs new file mode 100644 index 0000000..543d342 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Periodic event for timers</summary> + public class PeriodicEvent + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs new file mode 100644 index 0000000..38a36be --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary> + /// Event handler to provide publish/subscribe interfaces + /// </summary> + /// <typeparam name="T">The type of event handler</typeparam> + public class PubSubEventHandler<T> : IEventHandler<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubEventHandler<T>)); + + private Dictionary<Type, List<object>> _classToHandlersMap; + + /// <summary> + /// Construct a pub-sub event handler + /// </summary> + public PubSubEventHandler() + { + _classToHandlersMap = new Dictionary<Type, List<object>>(); + } + + /// <summary> + /// Subscribe an event handler for an event type + /// </summary> + /// <typeparam name="U">The type of event handler</typeparam> + /// <param name="handler">The event handler</param> + public void Subscribe<U>(IEventHandler<U> handler) where U : T + { + lock (_classToHandlersMap) + { + List<object> handlers; + if (!_classToHandlersMap.TryGetValue(typeof(U), out handlers)) + { + handlers = new List<object>(); + _classToHandlersMap[typeof(U)] = handlers; + } + handlers.Add(handler); + } + } + + /// <summary> + /// Invoke the subscribed handlers for the event class type + /// </summary> + /// <param name="value">The event to process</param> + public void OnNext(T value) + { + if (value == null) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("value"), LOGGER); + } + + lock (_classToHandlersMap) + { + // Check that the event type has been subscribed + List<object> handlers; + if (!_classToHandlersMap.TryGetValue(value.GetType(), out handlers)) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER); + } + + // Invoke each handler for the event type + foreach (object handler in handlers) + { + Type handlerType = typeof(IEventHandler<>).MakeGenericType(new[] { value.GetType() }); + MethodInfo info = handlerType.GetMethod("OnNext"); + info.Invoke(handler, new[] { (object)value }); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs new file mode 100644 index 0000000..e27f67e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Threading; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Single thread stage that runs the event handler</summary> + public class SingleThreadStage<T> : AbstractEStage<T> + { + private readonly BlockingCollection<T> queue; + + private readonly Thread thread; + + private bool interrupted; + + public SingleThreadStage(IEventHandler<T> handler, int capacity) : base(handler.GetType().FullName) + { + queue = new BlockingCollection<T>(capacity); + interrupted = false; + thread = new Thread(new ThreadStart(new Producer<T>(queue, handler, interrupted).Run)); + thread.Start(); + } + + /// <summary> + /// Puts the value to the queue, which will be processed by the handler later + /// if the queue is full, IllegalStateException is thrown + /// </summary> + /// <param name="value">the value</param> + public override void OnNext(T value) + { + base.OnNext(value); + queue.Add(value); + } + + /// <summary> + /// Closes the stage + /// </summary> + public override void Dispose() + { + interrupted = true; + thread.Interrupt(); + } + } + + /// <summary>Takes events from the queue and provides them to the handler</summary> + /// <typeparam name="T">The type</typeparam> + internal class Producer<T> + { + private readonly BlockingCollection<T> _queue; + + private readonly IEventHandler<T> _handler; + + private volatile bool _interrupted; + + internal Producer(BlockingCollection<T> queue, IEventHandler<T> handler, bool interrupted) + { + _queue = queue; + _handler = handler; + _interrupted = interrupted; + } + + public void Run() + { + while (true) + { + try + { + T value = _queue.Take(); + _handler.OnNext(value); + } + catch (Exception) + { + if (_interrupted) + { + break; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs new file mode 100644 index 0000000..c6a7b22 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Stage that synchronously executes an event handler</summary> + public class SyncStage<T> : AbstractEStage<T> + { + private readonly IEventHandler<T> _handler; + + /// <summary>Constructs a synchronous stage</summary> + /// <param name="handler">an event handler</param> + public SyncStage(IEventHandler<T> handler) : base(handler.GetType().FullName) + { + _handler = handler; + } + + /// <summary>Invokes the handler for the event</summary> + /// <param name="value">an event</param> + public override void OnNext(T value) + { + base.OnNext(value); + _handler.OnNext(value); + } + + public override void Dispose() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs new file mode 100644 index 0000000..f1bb67d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Stage that executes an event handler with a thread pool</summary> + public class ThreadPoolStage<T> : AbstractEStage<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ThreadPoolStage<T>)); + + private readonly IEventHandler<T> _handler; + + private readonly ITaskService _taskService; + + private readonly int _numThreads; + + /// <summary>Constructs a thread-pool stage</summary> + /// <param name="handler">An event handler to execute</param> + /// <param name="numThreads">The number of threads to use</param> + public ThreadPoolStage(IEventHandler<T> handler, int numThreads) + : base(handler.GetType().FullName) + { + _handler = handler; + if (numThreads <= 0) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER); + } + _numThreads = numThreads; + _taskService = new FixedThreadPoolTaskService(numThreads); + } + + /// <summary>Constructs a thread-pool stage</summary> + /// <param name="handler">an event handler to execute</param> + /// <param name="taskService">an external executor service provided</param> + public ThreadPoolStage(IEventHandler<T> handler, ITaskService taskService) : base( + handler.GetType().FullName) + { + _handler = handler; + _numThreads = 0; + _taskService = taskService; + } + + /// <summary>Handles the event using a thread in the thread pool</summary> + /// <param name="value">an event</param> + public override void OnNext(T value) + { + base.OnNext(value); + _taskService.Execute(new _Startable_74(this, value).Start); + } + + /// <summary> + /// Closes resources + /// </summary> + public override void Dispose() + { + if (_numThreads > 0) + { + _taskService.Shutdown(); + } + } + + private sealed class _Startable_74 : IStartable + { + private readonly ThreadPoolStage<T> _enclosing; + private readonly T _value; + + public _Startable_74(ThreadPoolStage<T> enclosing, T value) + { + _enclosing = enclosing; + _value = value; + } + + public void Start() + { + _enclosing._handler.OnNext(_value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs new file mode 100644 index 0000000..f2f2f35 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Timers; + +using Org.Apache.REEF.Wake; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary>Stage that triggers an event handler periodically</summary> + public class TimerStage : IStage + { + //private readonly ScheduledExecutorService executor; + private readonly Timer _timer; + private readonly PeriodicEvent _value = new PeriodicEvent(); + private readonly IEventHandler<PeriodicEvent> _handler; + + /// <summary>Constructs a timer stage with no initial delay</summary> + /// <param name="handler">an event handler</param> + /// <param name="period">a period in milli-seconds</param> + public TimerStage(IEventHandler<PeriodicEvent> handler, long period) : this(handler, 0, period) + { + } + + /// <summary>Constructs a timer stage</summary> + /// <param name="handler">an event handler</param> + /// <param name="initialDelay">an initial delay</param> + /// <param name="period">a period in milli-seconds</param> + public TimerStage(IEventHandler<PeriodicEvent> handler, long initialDelay, long period) + { + _handler = handler; + _timer = new Timer(period); + _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _handler, _value); + _timer.Enabled = true; + } + + /// <summary> + /// Closes resources + /// </summary> + public void Dispose() + { + _timer.Stop(); + } + + private static void OnTimedEvent(object source, ElapsedEventArgs e, IEventHandler<PeriodicEvent> handler, PeriodicEvent value) + { + handler.OnNext(value); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj new file mode 100644 index 0000000..f383707 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -0,0 +1,216 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{CDFB3464-4041-42B1-9271-83AF24CD5008}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Wake</RootNamespace> + <AssemblyName>Org.Apache.REEF.Wake</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <RestorePackages>true</RestorePackages> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> + </PropertyGroup> + <Import Project="$(SolutionDir)\Source\build.props" /> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="protobuf-net"> + <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Reactive.Core"> + <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Interfaces"> + <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="AbstractEStage.cs" /> + <Compile Include="IEStage.cs" /> + <Compile Include="IEventHandler.cs" /> + <Compile Include="IIdentifier.cs" /> + <Compile Include="IIdentifierFactory.cs" /> + <Compile Include="Impl\LoggingEventHandler.cs" /> + <Compile Include="Impl\MissingStartHandlerHandler.cs" /> + <Compile Include="Impl\MultiEventHandler.cs" /> + <Compile Include="Impl\PeriodicEvent.cs" /> + <Compile Include="Impl\PubSubEventHandler.cs" /> + <Compile Include="Impl\SingleThreadStage.cs" /> + <Compile Include="Impl\SyncStage.cs" /> + <Compile Include="Impl\ThreadPoolStage.cs" /> + <Compile Include="Impl\TimerStage.cs" /> + <Compile Include="IObserverFactory.cs" /> + <Compile Include="IStage.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Remote\ICodec.cs" /> + <Compile Include="Remote\ICodecFactory.cs" /> + <Compile Include="Remote\IDecoder.cs" /> + <Compile Include="Remote\IEncoder.cs" /> + <Compile Include="Remote\ILink.cs" /> + <Compile Include="Remote\Impl\ByteCodec.cs" /> + <Compile Include="Remote\Impl\ByteCodecFactory.cs" /> + <Compile Include="Remote\Impl\Channel.cs" /> + <Compile Include="Remote\Impl\DefaultRemoteManager.cs" /> + <Compile Include="Remote\Impl\DefaultRemoteMessage.cs" /> + <Compile Include="Remote\Impl\IntCodec.cs" /> + <Compile Include="Remote\Impl\IPEndpointComparer.cs" /> + <Compile Include="Remote\Impl\Link.cs" /> + <Compile Include="Remote\Impl\MultiCodec.cs" /> + <Compile Include="Remote\Impl\MultiDecoder.cs" /> + <Compile Include="Remote\Impl\MultiEncoder.cs" /> + <Compile Include="Remote\Impl\ObserverContainer.cs" /> + <Compile Include="Remote\Impl\RemoteEvent.cs" /> + <Compile Include="Remote\Impl\RemoteEventCodec.cs" /> + <Compile Include="Remote\Impl\RemoteEventDecoder.cs" /> + <Compile Include="Remote\Impl\RemoteEventEncoder.cs" /> + <Compile Include="Remote\Impl\RemoteEventEndpoint.cs" /> + <Compile Include="Remote\Impl\SocketRemoteIdentifier.cs" /> + <Compile Include="Remote\Impl\StringCodec.cs" /> + <Compile Include="Remote\Impl\StringIdentifier.cs" /> + <Compile Include="Remote\Impl\StringIdentifierFactory.cs" /> + <Compile Include="Remote\Impl\TransportClient.cs" /> + <Compile Include="Remote\Impl\TransportEvent.cs" /> + <Compile Include="Remote\Impl\TransportServer.cs" /> + <Compile Include="Remote\IRemoteEvent.cs" /> + <Compile Include="Remote\IRemoteIdentifier.cs" /> + <Compile Include="Remote\IRemoteIdentifierFactory.cs" /> + <Compile Include="Remote\IRemoteManager.cs" /> + <Compile Include="Remote\IRemoteMessage.cs" /> + <Compile Include="Remote\ISubscriptionManager.cs" /> + <Compile Include="Remote\Proto\WakeRemoteProtos.cs" /> + <Compile Include="Remote\RemoteConfiguration.cs" /> + <Compile Include="Remote\RemoteRuntimeException.cs" /> + <Compile Include="RX\AbstractObserver.cs" /> + <Compile Include="RX\AbstractRxStage.cs" /> + <Compile Include="RX\Impl\PubSubSubject.cs" /> + <Compile Include="RX\Impl\RxSyncStage.cs" /> + <Compile Include="RX\Impl\RxThreadPoolStage.cs" /> + <Compile Include="RX\Impl\RxTimerStage.cs" /> + <Compile Include="RX\Impl\SimpleSubject.cs" /> + <Compile Include="RX\IRxStage.cs" /> + <Compile Include="RX\IStaticObservable.cs" /> + <Compile Include="RX\ISubject.cs" /> + <Compile Include="RX\ObserverCompletedException.cs" /> + <Compile Include="src\main\cs\Examples\P2p\IEventSource.cs" /> + <Compile Include="src\main\cs\Examples\P2p\Pull2Push.cs" /> + <Compile Include="src\main\cs\PeriodicEvent.cs" /> + <Compile Include="Protobuf\WakeRemoteProtosGen.cs" /> + <Compile Include="Time\Event\Alarm.cs" /> + <Compile Include="Time\Event\StartTime.cs" /> + <Compile Include="Time\Event\StopTime.cs" /> + <Compile Include="Time\IClock.cs" /> + <Compile Include="Time\Runtime\Event\ClientAlarm.cs" /> + <Compile Include="Time\Runtime\Event\IdleClock.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeAlarm.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeStart.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeStop.cs" /> + <Compile Include="Time\Runtime\ITimer.cs" /> + <Compile Include="Time\Runtime\LogicalTimer.cs" /> + <Compile Include="Time\Runtime\RealTimer.cs" /> + <Compile Include="Time\Runtime\RuntimeClock.cs" /> + <Compile Include="Time\Time.cs" /> + <Compile Include="Util\Actionable.cs" /> + <Compile Include="Util\Disposable.cs" /> + <Compile Include="Util\FixedThreadPoolTaskService.cs" /> + <Compile Include="Util\IStartable.cs" /> + <Compile Include="Util\ITaskService.cs" /> + <Compile Include="Util\LimitedConcurrencyLevelTaskScheduler.cs" /> + <Compile Include="Util\NetworkUtils.cs" /> + <Compile Include="Util\SerializationHelper.cs" /> + <Compile Include="Util\TaskExtensions.cs" /> + <Compile Include="Util\TimeHelper.cs" /> + <Compile Include="WakeRuntimeException.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + <None Include="Protobuf\RemoteProtocol.proto" /> + </ItemGroup> + <ItemGroup> + <Folder Include="Time\Time\Event\" /> + <Folder Include="Time\Time\Runtime\Event\" /> + <Folder Include="Util\Util\" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..48a4764 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Wake")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Wake")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("86a66ac8-0c8e-4652-b533-670e800cb0ea")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto new file mode 100644 index 0000000..cd28d13 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +option java_package = "org.apache.reef.wake.remote.proto"; +option java_outer_classname = "WakeRemoteProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message WakeMessagePBuf { + required bytes data = 1; + required int64 seq = 2; + optional string source = 3; + optional string sink = 4; +} + +message WakeTuplePBuf { + required string className = 1; + required bytes data = 2; +} + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs new file mode 100644 index 0000000..f3c59f8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//------------------------------------------------------------------------------ +// <auto-generated> +// This code was generated by a tool. +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// </auto-generated> +//------------------------------------------------------------------------------ + +// Generated from: src/main/proto/RemoteProtocol.proto +namespace Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos +{ + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")] + public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible + { + public WakeMessagePBuf() {} + + private byte[] _data; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] data + { + get { return _data; } + set { _data = value; } + } + private long _seq; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public long seq + { + get { return _seq; } + set { _seq = value; } + } + private string _source = ""; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string source + { + get { return _source; } + set { _source = value; } + } + private string _sink = ""; + [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string sink + { + get { return _sink; } + set { _sink = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")] + public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible + { + public WakeTuplePBuf() {} + + private string _className; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string className + { + get { return _className; } + set { _className = value; } + } + private byte[] _data; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] data + { + get { return _data; } + set { _data = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs new file mode 100644 index 0000000..3c3451b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Logging; +using System; + +namespace Org.Apache.REEF.Wake.RX +{ + /// <summary> + /// An observer with logging-only onError and onCompleted() methods. + /// </summary> + /// <typeparam name="T">The observer type</typeparam> + public abstract class AbstractObserver<T> : IObserver<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>)); + + public virtual void OnError(Exception error) + { + LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error); + } + + public virtual void OnCompleted() + { + LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() "); + } + + public abstract void OnNext(T arg1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs new file mode 100644 index 0000000..5238935 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX +{ + /// <summary> + /// An Rx stage that implements metering + /// </summary> + public abstract class AbstractRxStage<T> : IRxStage<T> + { + //protected internal readonly Meter meter; + + /// <summary>Constructs an abstact rxstage</summary> + /// <param name="meterName">the name of the meter</param> + public AbstractRxStage(string meterName) + { + //meter = new Meter(meterName); + } + + /// <summary>Updates the meter</summary> + /// <param name="value">the event</param> + public virtual void OnNext(T value) + { + //meter.Mark(1); + } + + public abstract void OnCompleted(); + + public abstract void OnError(Exception error); + + public virtual void Dispose() + { + // no op + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs new file mode 100644 index 0000000..bc93a0e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX +{ + /// <summary>Stage that executes the observer</summary> + public interface IRxStage<T> : IObserver<T>, IStage + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs new file mode 100644 index 0000000..035953c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Org.Apache.REEF.Wake.RX +{ + public interface IStaticObservable + { + //intentionally empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs new file mode 100644 index 0000000..9829dfd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX +{ + /// <summary>A class implementing Observer> and StaticObservable</summary> + /// <typeparam name="In">The in type</typeparam> + /// <typeparam name="Out">The out type</typeparam> + public interface ISubject<In, Out> : IObserver<In>, IStaticObservable + { + // intentionally empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs new file mode 100644 index 0000000..73962dd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Subjects; +using System.Reflection; +using System.Text; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.RX.Impl +{ + /// <summary> + /// Subject to provide publish/subscribe interface. + /// Subscribes to class Types and invokes handlers for a given + /// type on call to OnNext + /// </summary> + /// <typeparam name="T">The super type that all event types + /// inherit from</typeparam> + public class PubSubSubject<T> : IObserver<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>)); + + private Dictionary<Type, List<object>> _classToObserversMap; + private bool _completed; + private object _mutex; + + /// <summary> + /// Constructs a pub-sub Subject + /// </summary> + public PubSubSubject() + { + _classToObserversMap = new Dictionary<Type, List<object>>(); + _mutex = new object(); + } + + /// <summary> + /// Log on completion + /// </summary> + public void OnCompleted() + { + lock (_mutex) + { + _completed = true; + } + } + + /// <summary> + /// Log Exception + /// </summary> + /// <param name="error"></param> + public void OnError(Exception error) + { + lock (_mutex) + { + _completed = true; + } + } + + /// <summary> + /// Invoke the subscribed handlers for the event class type + /// </summary> + /// <param name="value">The event to process</param> + public void OnNext(T value) + { + if (value == null) + { + Exceptions.Throw(new ArgumentNullException("value"), LOGGER); + } + + lock (_mutex) + { + // If OnCompleted or OnError called, do nothing + if (_completed) + { + return; + } + + // Check that the event type has been subscribed + List<object> handlers; + if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers)) + { + Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER); + } + + // Invoke each IObserver for the event type + foreach (object handler in handlers) + { + Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() }); + MethodInfo info = handlerType.GetMethod("OnNext"); + info.Invoke(handler, new[] { (object) value }); + } + } + } + + /// <summary> + /// Subscribe an IObserver for an event type + /// </summary> + /// <typeparam name="U">The event type</typeparam> + /// <param name="observer">The observer to handle the event</param> + /// <returns>An IDisposable object used to handle unsubscribing + /// the IObserver</returns> + public IDisposable Subscribe<U>(IObserver<U> observer) where U : T + { + lock (_mutex) + { + List<object> observers; + if (!_classToObserversMap.TryGetValue(typeof(U), out observers)) + { + observers = new List<object>(); + _classToObserversMap[typeof(U)] = observers; + } + observers.Add(observer); + } + + return new DisposableResource<U>(_classToObserversMap, observer, _mutex); + } + + /// <summary> + /// Utility class to handle disposing of an IObserver + /// </summary> + private class DisposableResource<U> : IDisposable + { + private Dictionary<Type, List<object>> _observersMap; + private IObserver<U> _observer; + private object _mutex; + private bool _disposed; + + public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex) + { + _observersMap = observersMap; + _observer = observer; + _mutex = mutex; + _disposed = false; + } + + /// <summary> + /// Unsubscribe the IObserver from the observer map + /// </summary> + public void Dispose() + { + if (!_disposed) + { + UnsubscribeObserver(); + _disposed = true; + } + } + + private void UnsubscribeObserver() + { + lock (_mutex) + { + List<object> observers; + if (_observersMap.TryGetValue(typeof(U), out observers)) + { + observers.Remove(_observer); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs new file mode 100644 index 0000000..e2527a0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX.Impl +{ + /// <summary>Stage that executes the observer synchronously</summary> + public class RxSyncStage<T> : AbstractRxStage<T> + { + private readonly IObserver<T> _observer; + + /// <summary>Constructs a Rx synchronous stage</summary> + /// <param name="observer">the observer</param> + public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName) + { + _observer = observer; + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public override void OnNext(T value) + { + base.OnNext(value); + _observer.OnNext(value); + } + + /// <summary> + /// Notifies the observer that the provider has experienced an error + /// condition. + /// </summary> + /// <param name="error">the error</param> + public override void OnError(Exception error) + { + _observer.OnError(error); + } + + /// <summary> + /// Notifies the observer that the provider has finished sending push-based + /// notifications. + /// </summary> + public override void OnCompleted() + { + _observer.OnCompleted(); + } + + /// <summary> + /// Closes the stage + /// </summary> + public override void Dispose() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs new file mode 100644 index 0000000..d329430 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Util; +using System; + +namespace Org.Apache.REEF.Wake.RX.Impl +{ + /// <summary>Stage that executes the observer with a thread pool</summary> + public class RxThreadPoolStage<T> : AbstractRxStage<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>)); + + private readonly IObserver<T> _observer; + + private readonly ITaskService _taskService; + + /// <summary>Constructs a Rx thread pool stage</summary> + /// <param name="observer">the observer to execute</param> + /// <param name="numThreads">the number of threads</param> + public RxThreadPoolStage(IObserver<T> observer, int numThreads) + : base(observer.GetType().FullName) + { + _observer = observer; + if (numThreads <= 0) + { + Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER); + } + _taskService = new FixedThreadPoolTaskService(numThreads); + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public override void OnNext(T value) + { + base.OnNext(value); + _taskService.Execute(new _Startable_58(this, value).Start); + } + + /// <summary> + /// Notifies the observer that the provider has experienced an error + /// condition. + /// </summary> + /// <param name="error">the error</param> + public override void OnError(Exception error) + { + _taskService.Execute(new _Startable_75(this, error).Start); + } + + /// <summary> + /// Notifies the observer that the provider has finished sending push-based + /// notifications. + /// </summary> + public override void OnCompleted() + { + _taskService.Execute(new _Startable_91(this).Start); + } + + /// <summary> + /// Closes the stage + /// </summary> + public override void Dispose() + { + _taskService.Shutdown(); + } + + private sealed class _Startable_58 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + private readonly T _value; + + public _Startable_58(RxThreadPoolStage<T> enclosing, T value) + { + _enclosing = enclosing; + _value = value; + } + + public void Start() + { + _enclosing._observer.OnNext(_value); + } + } + + private sealed class _Startable_75 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + private readonly Exception _error; + + public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error) + { + _enclosing = enclosing; + _error = error; + } + + public void Start() + { + _enclosing._observer.OnError(_error); + } + } + + private sealed class _Startable_91 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + + public _Startable_91(RxThreadPoolStage<T> enclosing) + { + _enclosing = enclosing; + } + + public void Start() + { + _enclosing._observer.OnCompleted(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs new file mode 100644 index 0000000..541c2d4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Timers; + +using Org.Apache.REEF.Wake.Impl; + +namespace Org.Apache.REEF.Wake.RX.Impl +{ + /// <summary>Timer stage that provides events to the observer periodically</summary> + public class RxTimerStage : IStage, IStaticObservable + { + private readonly Timer _timer; + private readonly PeriodicEvent _value = new PeriodicEvent(); + private readonly IObserver<PeriodicEvent> _observer; + + /// <summary>Constructs a Rx timer stage</summary> + /// <param name="observer">the observer</param> + /// <param name="period">the period in milli-seconds</param> + public RxTimerStage(IObserver<PeriodicEvent> observer, long period) + : this(observer, 0, period) + { + } + + /// <summary>Constructs a Rx timer stage</summary> + /// <param name="observer">the observer</param> + /// <param name="initialDelay">the initial delay in milli-seconds</param> + /// <param name="period">the period in milli-seconds</param> + public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period) + { + _observer = observer; + _timer = new Timer(period); + _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value); + _timer.Enabled = true; + } + + /// <summary> + /// Closes the stage + /// </summary> + public void Dispose() + { + _timer.Stop(); + } + + private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value) + { + observer.OnNext(value); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs new file mode 100644 index 0000000..7d4aaa8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX.Impl +{ + /// <summary>A Subject that relays all messages to its subscribers.</summary> + public class SimpleSubject<T> : ISubject<T, T> + { + private readonly IObserver<T> _observer; + + /// <summary>Constructs a simple subject</summary> + /// <param name="observer">the observer</param> + public SimpleSubject(IObserver<T> observer) + { + _observer = observer; + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public virtual void OnNext(T value) + { + _observer.OnNext(value); + } + + /// <summary>Provides the observer with the error</summary> + /// <param name="error">the error</param> + public virtual void OnError(Exception error) + { + _observer.OnError(error); + } + + /// <summary> + /// Provides the observer with it has finished sending push-based + /// notifications. + /// </summary> + public virtual void OnCompleted() + { + _observer.OnCompleted(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs new file mode 100644 index 0000000..e620a7c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.RX +{ + /// <summary> + /// It is illegal to call onError() or onCompleted() when a call to onNext() is + /// still outstanding, or to call onNext(), onError() or onCompleted() after a + /// call to onError() or onCompleted() has been dispatched. + /// </summary> + /// <remarks> + /// It is illegal to call onError() or onCompleted() when a call to onNext() is + /// still outstanding, or to call onNext(), onError() or onCompleted() after a + /// call to onError() or onCompleted() has been dispatched. Observers may throw + /// an ObserverCompleted exception whenever this API is violated. Violating the + /// API leaves the Observer (and any resources that it holds) in an undefined + /// state, and throwing ObserverCompleted exceptions is optional. + /// Callers receiving this exception should simply pass it up the stack to the + /// Aura runtime. They should not attempt to forward it on to upstream or + /// downstream stages. The easiest way to do this is to ignore the exception + /// entirely. + /// </remarks> + [System.Serializable] + public class ObserverCompletedException : InvalidOperationException + { + private const long serialVersionUID = 1L; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs new file mode 100644 index 0000000..2591d1e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface ICodec + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The codec type</typeparam> + public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs new file mode 100644 index 0000000..68b656a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote.Impl; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote +{ + [DefaultImplementation(typeof(ByteCodecFactory))] + public interface ICodecFactory + { + object Create(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs new file mode 100644 index 0000000..07dd28a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface IDecoder + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The decoder type</typeparam> + public interface IDecoder<T> : IDecoder + { + /// <summary>Decodes the given byte array into an object</summary> + /// <param name="data"></param> + /// <returns>the decoded object</returns> + T Decode(byte[] data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs new file mode 100644 index 0000000..b399131 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface IEncoder + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The encoder type</typeparam> + public interface IEncoder<T> : IEncoder + { + /// <summary>Encodes the given object into a Byte Array</summary> + /// <param name="obj"></param> + /// <returns>a byte[] representation of the object</returns> + byte[] Encode(T obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs new file mode 100644 index 0000000..c0ce1a2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Represents a link between two endpoints + /// </summary> + public interface ILink<T> : IDisposable + { + /// <summary> + /// Returns the local socket address + /// </summary> + IPEndPoint LocalEndpoint { get; } + + /// <summary> + /// Returns the remote socket address + /// </summary> + IPEndPoint RemoteEndpoint { get; } + + /// <summary> + /// Writes the value to this link asynchronously + /// </summary> + /// <param name="value">The data to write</param> + /// <param name="token">The cancellation token</param> + Task WriteAsync(T value, CancellationToken token); + + /// <summary> + /// Writes the value to this link synchronously + /// </summary> + /// <param name="value">The data to write</param> + void Write(T value); + + /// <summary> + /// Reads the value from this link asynchronously + /// </summary> + /// <returns>The read data</returns> + /// <param name="token">The cancellation token</param> + Task<T> ReadAsync(CancellationToken token); + + /// <summary> + /// Reads the value from this link synchronously + /// </summary> + /// <returns>The read data</returns> + T Read(); + } +}
