http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs b/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs deleted file mode 100644 index ba4a5a0..0000000 --- a/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Reflection; -using Org.Apache.Reef.Utilities.Logging; - -namespace Org.Apache.Reef.Wake.Impl -{ - /// <summary> - /// Event handler to provide publish/subscribe interfaces - /// </summary> - /// <typeparam name="T">The type of event handler</typeparam> - public class PubSubEventHandler<T> : IEventHandler<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubEventHandler<T>)); - - private Dictionary<Type, List<object>> _classToHandlersMap; - - /// <summary> - /// Construct a pub-sub event handler - /// </summary> - public PubSubEventHandler() - { - _classToHandlersMap = new Dictionary<Type, List<object>>(); - } - - /// <summary> - /// Subscribe an event handler for an event type - /// </summary> - /// <typeparam name="U">The type of event handler</typeparam> - /// <param name="handler">The event handler</param> - public void Subscribe<U>(IEventHandler<U> handler) where U : T - { - lock (_classToHandlersMap) - { - List<object> handlers; - if (!_classToHandlersMap.TryGetValue(typeof(U), out handlers)) - { - handlers = new List<object>(); - _classToHandlersMap[typeof(U)] = handlers; - } - handlers.Add(handler); - } - } - - /// <summary> - /// Invoke the subscribed handlers for the event class type - /// </summary> - /// <param name="value">The event to process</param> - public void OnNext(T value) - { - if (value == null) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("value"), LOGGER); - } - - lock (_classToHandlersMap) - { - // Check that the event type has been subscribed - List<object> handlers; - if (!_classToHandlersMap.TryGetValue(value.GetType(), out handlers)) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER); - } - - // Invoke each handler for the event type - foreach (object handler in handlers) - { - Type handlerType = typeof(IEventHandler<>).MakeGenericType(new[] { value.GetType() }); - MethodInfo info = handlerType.GetMethod("OnNext"); - info.Invoke(handler, new[] { (object)value }); - } - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs b/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs deleted file mode 100644 index 163d347..0000000 --- a/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Threading; - -namespace Org.Apache.Reef.Wake.Impl -{ - /// <summary>Single thread stage that runs the event handler</summary> - public class SingleThreadStage<T> : AbstractEStage<T> - { - private readonly BlockingCollection<T> queue; - - private readonly Thread thread; - - private bool interrupted; - - public SingleThreadStage(IEventHandler<T> handler, int capacity) : base(handler.GetType().FullName) - { - queue = new BlockingCollection<T>(capacity); - interrupted = false; - thread = new Thread(new ThreadStart(new Producer<T>(queue, handler, interrupted).Run)); - thread.Start(); - } - - /// <summary> - /// Puts the value to the queue, which will be processed by the handler later - /// if the queue is full, IllegalStateException is thrown - /// </summary> - /// <param name="value">the value</param> - public override void OnNext(T value) - { - base.OnNext(value); - queue.Add(value); - } - - /// <summary> - /// Closes the stage - /// </summary> - public override void Dispose() - { - interrupted = true; - thread.Interrupt(); - } - } - - /// <summary>Takes events from the queue and provides them to the handler</summary> - /// <typeparam name="T">The type</typeparam> - internal class Producer<T> - { - private readonly BlockingCollection<T> _queue; - - private readonly IEventHandler<T> _handler; - - private volatile bool _interrupted; - - internal Producer(BlockingCollection<T> queue, IEventHandler<T> handler, bool interrupted) - { - _queue = queue; - _handler = handler; - _interrupted = interrupted; - } - - public void Run() - { - while (true) - { - try - { - T value = _queue.Take(); - _handler.OnNext(value); - } - catch (Exception) - { - if (_interrupted) - { - break; - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs b/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs deleted file mode 100644 index bfa3fe0..0000000 --- a/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Wake; - -namespace Org.Apache.Reef.Wake.Impl -{ - /// <summary>Stage that synchronously executes an event handler</summary> - public class SyncStage<T> : AbstractEStage<T> - { - private readonly IEventHandler<T> _handler; - - /// <summary>Constructs a synchronous stage</summary> - /// <param name="handler">an event handler</param> - public SyncStage(IEventHandler<T> handler) : base(handler.GetType().FullName) - { - _handler = handler; - } - - /// <summary>Invokes the handler for the event</summary> - /// <param name="value">an event</param> - public override void OnNext(T value) - { - base.OnNext(value); - _handler.OnNext(value); - } - - public override void Dispose() - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs deleted file mode 100644 index 6054f86..0000000 --- a/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Threading; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake; -using Org.Apache.Reef.Wake.Util; - -namespace Org.Apache.Reef.Wake.Impl -{ - /// <summary>Stage that executes an event handler with a thread pool</summary> - public class ThreadPoolStage<T> : AbstractEStage<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ThreadPoolStage<T>)); - - private readonly IEventHandler<T> _handler; - - private readonly ITaskService _taskService; - - private readonly int _numThreads; - - /// <summary>Constructs a thread-pool stage</summary> - /// <param name="handler">An event handler to execute</param> - /// <param name="numThreads">The number of threads to use</param> - public ThreadPoolStage(IEventHandler<T> handler, int numThreads) - : base(handler.GetType().FullName) - { - _handler = handler; - if (numThreads <= 0) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER); - } - _numThreads = numThreads; - _taskService = new FixedThreadPoolTaskService(numThreads); - } - - /// <summary>Constructs a thread-pool stage</summary> - /// <param name="handler">an event handler to execute</param> - /// <param name="taskService">an external executor service provided</param> - public ThreadPoolStage(IEventHandler<T> handler, ITaskService taskService) : base( - handler.GetType().FullName) - { - _handler = handler; - _numThreads = 0; - _taskService = taskService; - } - - /// <summary>Handles the event using a thread in the thread pool</summary> - /// <param name="value">an event</param> - public override void OnNext(T value) - { - base.OnNext(value); - _taskService.Execute(new _Startable_74(this, value).Start); - } - - /// <summary> - /// Closes resources - /// </summary> - public override void Dispose() - { - if (_numThreads > 0) - { - _taskService.Shutdown(); - } - } - - private sealed class _Startable_74 : IStartable - { - private readonly ThreadPoolStage<T> _enclosing; - private readonly T _value; - - public _Startable_74(ThreadPoolStage<T> enclosing, T value) - { - _enclosing = enclosing; - _value = value; - } - - public void Start() - { - _enclosing._handler.OnNext(_value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs b/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs deleted file mode 100644 index 3b1e612..0000000 --- a/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Timers; - -using Org.Apache.Reef.Wake; - -namespace Org.Apache.Reef.Wake.Impl -{ - /// <summary>Stage that triggers an event handler periodically</summary> - public class TimerStage : IStage - { - //private readonly ScheduledExecutorService executor; - private readonly Timer _timer; - private readonly PeriodicEvent _value = new PeriodicEvent(); - private readonly IEventHandler<PeriodicEvent> _handler; - - /// <summary>Constructs a timer stage with no initial delay</summary> - /// <param name="handler">an event handler</param> - /// <param name="period">a period in milli-seconds</param> - public TimerStage(IEventHandler<PeriodicEvent> handler, long period) : this(handler, 0, period) - { - } - - /// <summary>Constructs a timer stage</summary> - /// <param name="handler">an event handler</param> - /// <param name="initialDelay">an initial delay</param> - /// <param name="period">a period in milli-seconds</param> - public TimerStage(IEventHandler<PeriodicEvent> handler, long initialDelay, long period) - { - _handler = handler; - _timer = new Timer(period); - _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _handler, _value); - _timer.Enabled = true; - } - - /// <summary> - /// Closes resources - /// </summary> - public void Dispose() - { - _timer.Stop(); - } - - private static void OnTimedEvent(object source, ElapsedEventArgs e, IEventHandler<PeriodicEvent> handler, PeriodicEvent value) - { - handler.OnNext(value); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs b/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs deleted file mode 100644 index c0d8070..0000000 --- a/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("Wake")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Wake")] -[assembly: AssemblyCopyright("Copyright © 2015")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from -// COM, set the ComVisible attribute to true on that type. -[assembly: ComVisible(false)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("86a66ac8-0c8e-4652-b533-670e800cb0ea")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto b/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto deleted file mode 100644 index cd28d13..0000000 --- a/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -option java_package = "org.apache.reef.wake.remote.proto"; -option java_outer_classname = "WakeRemoteProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -message WakeMessagePBuf { - required bytes data = 1; - required int64 seq = 2; - optional string source = 3; - optional string sink = 4; -} - -message WakeTuplePBuf { - required string className = 1; - required bytes data = 2; -} - http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs deleted file mode 100644 index e4b3f2b..0000000 --- a/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//------------------------------------------------------------------------------ -// <auto-generated> -// This code was generated by a tool. -// -// Changes to this file may cause incorrect behavior and will be lost if -// the code is regenerated. -// </auto-generated> -//------------------------------------------------------------------------------ - -// Generated from: src/main/proto/RemoteProtocol.proto -namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos -{ - [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")] - public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible - { - public WakeMessagePBuf() {} - - private byte[] _data; - [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] - public byte[] data - { - get { return _data; } - set { _data = value; } - } - private long _seq; - [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] - public long seq - { - get { return _seq; } - set { _seq = value; } - } - private string _source = ""; - [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)] - [global::System.ComponentModel.DefaultValue("")] - public string source - { - get { return _source; } - set { _source = value; } - } - private string _sink = ""; - [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)] - [global::System.ComponentModel.DefaultValue("")] - public string sink - { - get { return _sink; } - set { _sink = value; } - } - private global::ProtoBuf.IExtension extensionObject; - global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) - { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } - } - - [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")] - public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible - { - public WakeTuplePBuf() {} - - private string _className; - [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)] - public string className - { - get { return _className; } - set { _className = value; } - } - private byte[] _data; - [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] - public byte[] data - { - get { return _data; } - set { _data = value; } - } - private global::ProtoBuf.IExtension extensionObject; - global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) - { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs deleted file mode 100644 index b452a24..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Logging; -using System; - -namespace Org.Apache.Reef.Wake.RX -{ - /// <summary> - /// An observer with logging-only onError and onCompleted() methods. - /// </summary> - /// <typeparam name="T">The observer type</typeparam> - public abstract class AbstractObserver<T> : IObserver<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>)); - - public virtual void OnError(Exception error) - { - LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error); - } - - public virtual void OnCompleted() - { - LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() "); - } - - public abstract void OnNext(T arg1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs deleted file mode 100644 index 20c7431..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX -{ - /// <summary> - /// An Rx stage that implements metering - /// </summary> - public abstract class AbstractRxStage<T> : IRxStage<T> - { - //protected internal readonly Meter meter; - - /// <summary>Constructs an abstact rxstage</summary> - /// <param name="meterName">the name of the meter</param> - public AbstractRxStage(string meterName) - { - //meter = new Meter(meterName); - } - - /// <summary>Updates the meter</summary> - /// <param name="value">the event</param> - public virtual void OnNext(T value) - { - //meter.Mark(1); - } - - public abstract void OnCompleted(); - - public abstract void OnError(Exception error); - - public virtual void Dispose() - { - // no op - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs deleted file mode 100644 index e756328..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX -{ - /// <summary>Stage that executes the observer</summary> - public interface IRxStage<T> : IObserver<T>, IStage - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs deleted file mode 100644 index 577db4d..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Org.Apache.Reef.Wake.RX -{ - public interface IStaticObservable - { - //intentionally empty - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/ISubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/ISubject.cs b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs deleted file mode 100644 index 3679470..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/ISubject.cs +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX -{ - /// <summary>A class implementing Observer> and StaticObservable</summary> - /// <typeparam name="In">The in type</typeparam> - /// <typeparam name="Out">The out type</typeparam> - public interface ISubject<In, Out> : IObserver<In>, IStaticObservable - { - // intentionally empty - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs deleted file mode 100644 index d513020..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reactive.Subjects; -using System.Reflection; -using System.Text; -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; - -namespace Org.Apache.Reef.Wake.RX.Impl -{ - /// <summary> - /// Subject to provide publish/subscribe interface. - /// Subscribes to class Types and invokes handlers for a given - /// type on call to OnNext - /// </summary> - /// <typeparam name="T">The super type that all event types - /// inherit from</typeparam> - public class PubSubSubject<T> : IObserver<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>)); - - private Dictionary<Type, List<object>> _classToObserversMap; - private bool _completed; - private object _mutex; - - /// <summary> - /// Constructs a pub-sub Subject - /// </summary> - public PubSubSubject() - { - _classToObserversMap = new Dictionary<Type, List<object>>(); - _mutex = new object(); - } - - /// <summary> - /// Log on completion - /// </summary> - public void OnCompleted() - { - lock (_mutex) - { - _completed = true; - } - } - - /// <summary> - /// Log Exception - /// </summary> - /// <param name="error"></param> - public void OnError(Exception error) - { - lock (_mutex) - { - _completed = true; - } - } - - /// <summary> - /// Invoke the subscribed handlers for the event class type - /// </summary> - /// <param name="value">The event to process</param> - public void OnNext(T value) - { - if (value == null) - { - Exceptions.Throw(new ArgumentNullException("value"), LOGGER); - } - - lock (_mutex) - { - // If OnCompleted or OnError called, do nothing - if (_completed) - { - return; - } - - // Check that the event type has been subscribed - List<object> handlers; - if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers)) - { - Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER); - } - - // Invoke each IObserver for the event type - foreach (object handler in handlers) - { - Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() }); - MethodInfo info = handlerType.GetMethod("OnNext"); - info.Invoke(handler, new[] { (object) value }); - } - } - } - - /// <summary> - /// Subscribe an IObserver for an event type - /// </summary> - /// <typeparam name="U">The event type</typeparam> - /// <param name="observer">The observer to handle the event</param> - /// <returns>An IDisposable object used to handle unsubscribing - /// the IObserver</returns> - public IDisposable Subscribe<U>(IObserver<U> observer) where U : T - { - lock (_mutex) - { - List<object> observers; - if (!_classToObserversMap.TryGetValue(typeof(U), out observers)) - { - observers = new List<object>(); - _classToObserversMap[typeof(U)] = observers; - } - observers.Add(observer); - } - - return new DisposableResource<U>(_classToObserversMap, observer, _mutex); - } - - /// <summary> - /// Utility class to handle disposing of an IObserver - /// </summary> - private class DisposableResource<U> : IDisposable - { - private Dictionary<Type, List<object>> _observersMap; - private IObserver<U> _observer; - private object _mutex; - private bool _disposed; - - public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex) - { - _observersMap = observersMap; - _observer = observer; - _mutex = mutex; - _disposed = false; - } - - /// <summary> - /// Unsubscribe the IObserver from the observer map - /// </summary> - public void Dispose() - { - if (!_disposed) - { - UnsubscribeObserver(); - _disposed = true; - } - } - - private void UnsubscribeObserver() - { - lock (_mutex) - { - List<object> observers; - if (_observersMap.TryGetValue(typeof(U), out observers)) - { - observers.Remove(_observer); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs deleted file mode 100644 index 4803f89..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX.Impl -{ - /// <summary>Stage that executes the observer synchronously</summary> - public class RxSyncStage<T> : AbstractRxStage<T> - { - private readonly IObserver<T> _observer; - - /// <summary>Constructs a Rx synchronous stage</summary> - /// <param name="observer">the observer</param> - public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName) - { - _observer = observer; - } - - /// <summary>Provides the observer with the new value</summary> - /// <param name="value">the new value</param> - public override void OnNext(T value) - { - base.OnNext(value); - _observer.OnNext(value); - } - - /// <summary> - /// Notifies the observer that the provider has experienced an error - /// condition. - /// </summary> - /// <param name="error">the error</param> - public override void OnError(Exception error) - { - _observer.OnError(error); - } - - /// <summary> - /// Notifies the observer that the provider has finished sending push-based - /// notifications. - /// </summary> - public override void OnCompleted() - { - _observer.OnCompleted(); - } - - /// <summary> - /// Closes the stage - /// </summary> - public override void Dispose() - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs deleted file mode 100644 index 4986055..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Util; -using System; - -namespace Org.Apache.Reef.Wake.RX.Impl -{ - /// <summary>Stage that executes the observer with a thread pool</summary> - public class RxThreadPoolStage<T> : AbstractRxStage<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>)); - - private readonly IObserver<T> _observer; - - private readonly ITaskService _taskService; - - /// <summary>Constructs a Rx thread pool stage</summary> - /// <param name="observer">the observer to execute</param> - /// <param name="numThreads">the number of threads</param> - public RxThreadPoolStage(IObserver<T> observer, int numThreads) - : base(observer.GetType().FullName) - { - _observer = observer; - if (numThreads <= 0) - { - Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER); - } - _taskService = new FixedThreadPoolTaskService(numThreads); - } - - /// <summary>Provides the observer with the new value</summary> - /// <param name="value">the new value</param> - public override void OnNext(T value) - { - base.OnNext(value); - _taskService.Execute(new _Startable_58(this, value).Start); - } - - /// <summary> - /// Notifies the observer that the provider has experienced an error - /// condition. - /// </summary> - /// <param name="error">the error</param> - public override void OnError(Exception error) - { - _taskService.Execute(new _Startable_75(this, error).Start); - } - - /// <summary> - /// Notifies the observer that the provider has finished sending push-based - /// notifications. - /// </summary> - public override void OnCompleted() - { - _taskService.Execute(new _Startable_91(this).Start); - } - - /// <summary> - /// Closes the stage - /// </summary> - public override void Dispose() - { - _taskService.Shutdown(); - } - - private sealed class _Startable_58 : IStartable - { - private readonly RxThreadPoolStage<T> _enclosing; - private readonly T _value; - - public _Startable_58(RxThreadPoolStage<T> enclosing, T value) - { - _enclosing = enclosing; - _value = value; - } - - public void Start() - { - _enclosing._observer.OnNext(_value); - } - } - - private sealed class _Startable_75 : IStartable - { - private readonly RxThreadPoolStage<T> _enclosing; - private readonly Exception _error; - - public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error) - { - _enclosing = enclosing; - _error = error; - } - - public void Start() - { - _enclosing._observer.OnError(_error); - } - } - - private sealed class _Startable_91 : IStartable - { - private readonly RxThreadPoolStage<T> _enclosing; - - public _Startable_91(RxThreadPoolStage<T> enclosing) - { - _enclosing = enclosing; - } - - public void Start() - { - _enclosing._observer.OnCompleted(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs deleted file mode 100644 index 44dd77c..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Timers; - -using Org.Apache.Reef.Wake.Impl; - -namespace Org.Apache.Reef.Wake.RX.Impl -{ - /// <summary>Timer stage that provides events to the observer periodically</summary> - public class RxTimerStage : IStage, IStaticObservable - { - private readonly Timer _timer; - private readonly PeriodicEvent _value = new PeriodicEvent(); - private readonly IObserver<PeriodicEvent> _observer; - - /// <summary>Constructs a Rx timer stage</summary> - /// <param name="observer">the observer</param> - /// <param name="period">the period in milli-seconds</param> - public RxTimerStage(IObserver<PeriodicEvent> observer, long period) - : this(observer, 0, period) - { - } - - /// <summary>Constructs a Rx timer stage</summary> - /// <param name="observer">the observer</param> - /// <param name="initialDelay">the initial delay in milli-seconds</param> - /// <param name="period">the period in milli-seconds</param> - public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period) - { - _observer = observer; - _timer = new Timer(period); - _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value); - _timer.Enabled = true; - } - - /// <summary> - /// Closes the stage - /// </summary> - public void Dispose() - { - _timer.Stop(); - } - - private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value) - { - observer.OnNext(value); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs deleted file mode 100644 index 418dc98..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX.Impl -{ - /// <summary>A Subject that relays all messages to its subscribers.</summary> - public class SimpleSubject<T> : ISubject<T, T> - { - private readonly IObserver<T> _observer; - - /// <summary>Constructs a simple subject</summary> - /// <param name="observer">the observer</param> - public SimpleSubject(IObserver<T> observer) - { - _observer = observer; - } - - /// <summary>Provides the observer with the new value</summary> - /// <param name="value">the new value</param> - public virtual void OnNext(T value) - { - _observer.OnNext(value); - } - - /// <summary>Provides the observer with the error</summary> - /// <param name="error">the error</param> - public virtual void OnError(Exception error) - { - _observer.OnError(error); - } - - /// <summary> - /// Provides the observer with it has finished sending push-based - /// notifications. - /// </summary> - public virtual void OnCompleted() - { - _observer.OnCompleted(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs deleted file mode 100644 index b562055..0000000 --- a/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.Reef.Wake.RX -{ - /// <summary> - /// It is illegal to call onError() or onCompleted() when a call to onNext() is - /// still outstanding, or to call onNext(), onError() or onCompleted() after a - /// call to onError() or onCompleted() has been dispatched. - /// </summary> - /// <remarks> - /// It is illegal to call onError() or onCompleted() when a call to onNext() is - /// still outstanding, or to call onNext(), onError() or onCompleted() after a - /// call to onError() or onCompleted() has been dispatched. Observers may throw - /// an ObserverCompleted exception whenever this API is violated. Violating the - /// API leaves the Observer (and any resources that it holds) in an undefined - /// state, and throwing ObserverCompleted exceptions is optional. - /// Callers receiving this exception should simply pass it up the stack to the - /// Aura runtime. They should not attempt to forward it on to upstream or - /// downstream stages. The easiest way to do this is to ignore the exception - /// entirely. - /// </remarks> - [System.Serializable] - public class ObserverCompletedException : InvalidOperationException - { - private const long serialVersionUID = 1L; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs deleted file mode 100644 index 25f0bce..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface ICodec - { - } - - /// <summary> - /// Interface for serialization routines that translate back and forth between - /// byte arrays with low latency. - /// </summary> - /// <typeparam name="T">The codec type</typeparam> - public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T> - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs deleted file mode 100644 index 6ba2805..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Tang.Annotations; -using Org.Apache.Reef.Wake.Remote.Impl; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote -{ - [DefaultImplementation(typeof(ByteCodecFactory))] - public interface ICodecFactory - { - object Create(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs deleted file mode 100644 index ddc72c8..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface IDecoder - { - } - - /// <summary> - /// Interface for serialization routines that translate back and forth between - /// byte arrays with low latency. - /// </summary> - /// <typeparam name="T">The decoder type</typeparam> - public interface IDecoder<T> : IDecoder - { - /// <summary>Decodes the given byte array into an object</summary> - /// <param name="data"></param> - /// <returns>the decoded object</returns> - T Decode(byte[] data); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs deleted file mode 100644 index b2a743f..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface IEncoder - { - } - - /// <summary> - /// Interface for serialization routines that translate back and forth between - /// byte arrays with low latency. - /// </summary> - /// <typeparam name="T">The encoder type</typeparam> - public interface IEncoder<T> : IEncoder - { - /// <summary>Encodes the given object into a Byte Array</summary> - /// <param name="obj"></param> - /// <returns>a byte[] representation of the object</returns> - byte[] Encode(T obj); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/ILink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ILink.cs b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs deleted file mode 100644 index b25ef4f..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/ILink.cs +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.Reef.Wake.Remote.Impl; - -namespace Org.Apache.Reef.Wake.Remote -{ - /// <summary> - /// Represents a link between two endpoints - /// </summary> - public interface ILink<T> : IDisposable - { - /// <summary> - /// Returns the local socket address - /// </summary> - IPEndPoint LocalEndpoint { get; } - - /// <summary> - /// Returns the remote socket address - /// </summary> - IPEndPoint RemoteEndpoint { get; } - - /// <summary> - /// Writes the value to this link asynchronously - /// </summary> - /// <param name="value">The data to write</param> - /// <param name="token">The cancellation token</param> - Task WriteAsync(T value, CancellationToken token); - - /// <summary> - /// Writes the value to this link synchronously - /// </summary> - /// <param name="value">The data to write</param> - void Write(T value); - - /// <summary> - /// Reads the value from this link asynchronously - /// </summary> - /// <returns>The read data</returns> - /// <param name="token">The cancellation token</param> - Task<T> ReadAsync(CancellationToken token); - - /// <summary> - /// Reads the value from this link synchronously - /// </summary> - /// <returns>The read data</returns> - T Read(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs deleted file mode 100644 index d693401..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Text; - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface IRemoteEvent<T> - { - IPEndPoint LocalEndPoint { get; set; } - - IPEndPoint RemoteEndPoint { get; set; } - - string Source { get; } - - string Sink { get; } - - T Value { get; } - - long Sequence { get; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs deleted file mode 100644 index 1101774..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - /// <summary> - /// An identifier that represents a remote source - /// </summary> - public abstract class IRemoteIdentifier : IIdentifier - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs deleted file mode 100644 index fdea1e4..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - /// <summary>Factory that creates a RemoteIdentifier</summary> - public interface IRemoteIdentifierFactory : IIdentifierFactory - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs deleted file mode 100644 index a572b04..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using Org.Apache.Reef.Wake.Remote.Impl; - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface IRemoteManager<T> : IStage - { - IRemoteIdentifier Identifier { get; } - - IPEndPoint LocalEndpoint { get; } - - IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest); - - IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint); - - IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver); - - IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver); - - IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs deleted file mode 100644 index 4b3d2a3..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - /// <summary> - /// Message received from a remote handler - /// </summary> - public interface IRemoteMessage<T> - { - /// <summary> - /// Returns a remote identifier of the sender - /// </summary> - /// <returns>The remote identifier</returns> - IRemoteIdentifier Identifier { get; } - - /// <summary> - /// Returns an actual message - /// </summary> - /// <returns>The remote message</returns> - T Message { get; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs deleted file mode 100644 index 8d859e2..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote -{ - public interface ISubscriptionManager - { - void Unsubscribe(object token); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs deleted file mode 100644 index e596ab7..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Tang.Annotations; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class ByteCodec : ICodec<byte[]> - { - [Inject] - public ByteCodec() - { - } - - public byte[] Encode(byte[] obj) - { - return obj; - } - - public byte[] Decode(byte[] data) - { - return data; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs deleted file mode 100644 index 333f341..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Tang.Annotations; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class ByteCodecFactory : ICodecFactory - { - [Inject] - public ByteCodecFactory() - { - } - - public object Create() - { - return new ByteCodec(); - } - } -}
