http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs new file mode 100644 index 0000000..e4b3f2b --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//------------------------------------------------------------------------------ +// <auto-generated> +// This code was generated by a tool. +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// </auto-generated> +//------------------------------------------------------------------------------ + +// Generated from: src/main/proto/RemoteProtocol.proto +namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos +{ + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")] + public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible + { + public WakeMessagePBuf() {} + + private byte[] _data; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] data + { + get { return _data; } + set { _data = value; } + } + private long _seq; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)] + public long seq + { + get { return _seq; } + set { _seq = value; } + } + private string _source = ""; + [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string source + { + get { return _source; } + set { _source = value; } + } + private string _sink = ""; + [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)] + [global::System.ComponentModel.DefaultValue("")] + public string sink + { + get { return _sink; } + set { _sink = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + + [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")] + public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible + { + public WakeTuplePBuf() {} + + private string _className; + [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)] + public string className + { + get { return _className; } + set { _className = value; } + } + private byte[] _data; + [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)] + public byte[] data + { + get { return _data; } + set { _data = value; } + } + private global::ProtoBuf.IExtension extensionObject; + global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing) + { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); } + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs new file mode 100644 index 0000000..b452a24 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Logging; +using System; + +namespace Org.Apache.Reef.Wake.RX +{ + /// <summary> + /// An observer with logging-only onError and onCompleted() methods. + /// </summary> + /// <typeparam name="T">The observer type</typeparam> + public abstract class AbstractObserver<T> : IObserver<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>)); + + public virtual void OnError(Exception error) + { + LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error); + } + + public virtual void OnCompleted() + { + LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() "); + } + + public abstract void OnNext(T arg1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs new file mode 100644 index 0000000..20c7431 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX +{ + /// <summary> + /// An Rx stage that implements metering + /// </summary> + public abstract class AbstractRxStage<T> : IRxStage<T> + { + //protected internal readonly Meter meter; + + /// <summary>Constructs an abstact rxstage</summary> + /// <param name="meterName">the name of the meter</param> + public AbstractRxStage(string meterName) + { + //meter = new Meter(meterName); + } + + /// <summary>Updates the meter</summary> + /// <param name="value">the event</param> + public virtual void OnNext(T value) + { + //meter.Mark(1); + } + + public abstract void OnCompleted(); + + public abstract void OnError(Exception error); + + public virtual void Dispose() + { + // no op + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs new file mode 100644 index 0000000..e756328 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX +{ + /// <summary>Stage that executes the observer</summary> + public interface IRxStage<T> : IObserver<T>, IStage + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs new file mode 100644 index 0000000..577db4d --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Org.Apache.Reef.Wake.RX +{ + public interface IStaticObservable + { + //intentionally empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ISubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/ISubject.cs b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs new file mode 100644 index 0000000..3679470 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX +{ + /// <summary>A class implementing Observer> and StaticObservable</summary> + /// <typeparam name="In">The in type</typeparam> + /// <typeparam name="Out">The out type</typeparam> + public interface ISubject<In, Out> : IObserver<In>, IStaticObservable + { + // intentionally empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs new file mode 100644 index 0000000..d513020 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Subjects; +using System.Reflection; +using System.Text; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; + +namespace Org.Apache.Reef.Wake.RX.Impl +{ + /// <summary> + /// Subject to provide publish/subscribe interface. + /// Subscribes to class Types and invokes handlers for a given + /// type on call to OnNext + /// </summary> + /// <typeparam name="T">The super type that all event types + /// inherit from</typeparam> + public class PubSubSubject<T> : IObserver<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>)); + + private Dictionary<Type, List<object>> _classToObserversMap; + private bool _completed; + private object _mutex; + + /// <summary> + /// Constructs a pub-sub Subject + /// </summary> + public PubSubSubject() + { + _classToObserversMap = new Dictionary<Type, List<object>>(); + _mutex = new object(); + } + + /// <summary> + /// Log on completion + /// </summary> + public void OnCompleted() + { + lock (_mutex) + { + _completed = true; + } + } + + /// <summary> + /// Log Exception + /// </summary> + /// <param name="error"></param> + public void OnError(Exception error) + { + lock (_mutex) + { + _completed = true; + } + } + + /// <summary> + /// Invoke the subscribed handlers for the event class type + /// </summary> + /// <param name="value">The event to process</param> + public void OnNext(T value) + { + if (value == null) + { + Exceptions.Throw(new ArgumentNullException("value"), LOGGER); + } + + lock (_mutex) + { + // If OnCompleted or OnError called, do nothing + if (_completed) + { + return; + } + + // Check that the event type has been subscribed + List<object> handlers; + if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers)) + { + Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER); + } + + // Invoke each IObserver for the event type + foreach (object handler in handlers) + { + Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() }); + MethodInfo info = handlerType.GetMethod("OnNext"); + info.Invoke(handler, new[] { (object) value }); + } + } + } + + /// <summary> + /// Subscribe an IObserver for an event type + /// </summary> + /// <typeparam name="U">The event type</typeparam> + /// <param name="observer">The observer to handle the event</param> + /// <returns>An IDisposable object used to handle unsubscribing + /// the IObserver</returns> + public IDisposable Subscribe<U>(IObserver<U> observer) where U : T + { + lock (_mutex) + { + List<object> observers; + if (!_classToObserversMap.TryGetValue(typeof(U), out observers)) + { + observers = new List<object>(); + _classToObserversMap[typeof(U)] = observers; + } + observers.Add(observer); + } + + return new DisposableResource<U>(_classToObserversMap, observer, _mutex); + } + + /// <summary> + /// Utility class to handle disposing of an IObserver + /// </summary> + private class DisposableResource<U> : IDisposable + { + private Dictionary<Type, List<object>> _observersMap; + private IObserver<U> _observer; + private object _mutex; + private bool _disposed; + + public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex) + { + _observersMap = observersMap; + _observer = observer; + _mutex = mutex; + _disposed = false; + } + + /// <summary> + /// Unsubscribe the IObserver from the observer map + /// </summary> + public void Dispose() + { + if (!_disposed) + { + UnsubscribeObserver(); + _disposed = true; + } + } + + private void UnsubscribeObserver() + { + lock (_mutex) + { + List<object> observers; + if (_observersMap.TryGetValue(typeof(U), out observers)) + { + observers.Remove(_observer); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs new file mode 100644 index 0000000..4803f89 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX.Impl +{ + /// <summary>Stage that executes the observer synchronously</summary> + public class RxSyncStage<T> : AbstractRxStage<T> + { + private readonly IObserver<T> _observer; + + /// <summary>Constructs a Rx synchronous stage</summary> + /// <param name="observer">the observer</param> + public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName) + { + _observer = observer; + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public override void OnNext(T value) + { + base.OnNext(value); + _observer.OnNext(value); + } + + /// <summary> + /// Notifies the observer that the provider has experienced an error + /// condition. + /// </summary> + /// <param name="error">the error</param> + public override void OnError(Exception error) + { + _observer.OnError(error); + } + + /// <summary> + /// Notifies the observer that the provider has finished sending push-based + /// notifications. + /// </summary> + public override void OnCompleted() + { + _observer.OnCompleted(); + } + + /// <summary> + /// Closes the stage + /// </summary> + public override void Dispose() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs new file mode 100644 index 0000000..4986055 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Util; +using System; + +namespace Org.Apache.Reef.Wake.RX.Impl +{ + /// <summary>Stage that executes the observer with a thread pool</summary> + public class RxThreadPoolStage<T> : AbstractRxStage<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>)); + + private readonly IObserver<T> _observer; + + private readonly ITaskService _taskService; + + /// <summary>Constructs a Rx thread pool stage</summary> + /// <param name="observer">the observer to execute</param> + /// <param name="numThreads">the number of threads</param> + public RxThreadPoolStage(IObserver<T> observer, int numThreads) + : base(observer.GetType().FullName) + { + _observer = observer; + if (numThreads <= 0) + { + Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER); + } + _taskService = new FixedThreadPoolTaskService(numThreads); + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public override void OnNext(T value) + { + base.OnNext(value); + _taskService.Execute(new _Startable_58(this, value).Start); + } + + /// <summary> + /// Notifies the observer that the provider has experienced an error + /// condition. + /// </summary> + /// <param name="error">the error</param> + public override void OnError(Exception error) + { + _taskService.Execute(new _Startable_75(this, error).Start); + } + + /// <summary> + /// Notifies the observer that the provider has finished sending push-based + /// notifications. + /// </summary> + public override void OnCompleted() + { + _taskService.Execute(new _Startable_91(this).Start); + } + + /// <summary> + /// Closes the stage + /// </summary> + public override void Dispose() + { + _taskService.Shutdown(); + } + + private sealed class _Startable_58 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + private readonly T _value; + + public _Startable_58(RxThreadPoolStage<T> enclosing, T value) + { + _enclosing = enclosing; + _value = value; + } + + public void Start() + { + _enclosing._observer.OnNext(_value); + } + } + + private sealed class _Startable_75 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + private readonly Exception _error; + + public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error) + { + _enclosing = enclosing; + _error = error; + } + + public void Start() + { + _enclosing._observer.OnError(_error); + } + } + + private sealed class _Startable_91 : IStartable + { + private readonly RxThreadPoolStage<T> _enclosing; + + public _Startable_91(RxThreadPoolStage<T> enclosing) + { + _enclosing = enclosing; + } + + public void Start() + { + _enclosing._observer.OnCompleted(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs new file mode 100644 index 0000000..44dd77c --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Timers; + +using Org.Apache.Reef.Wake.Impl; + +namespace Org.Apache.Reef.Wake.RX.Impl +{ + /// <summary>Timer stage that provides events to the observer periodically</summary> + public class RxTimerStage : IStage, IStaticObservable + { + private readonly Timer _timer; + private readonly PeriodicEvent _value = new PeriodicEvent(); + private readonly IObserver<PeriodicEvent> _observer; + + /// <summary>Constructs a Rx timer stage</summary> + /// <param name="observer">the observer</param> + /// <param name="period">the period in milli-seconds</param> + public RxTimerStage(IObserver<PeriodicEvent> observer, long period) + : this(observer, 0, period) + { + } + + /// <summary>Constructs a Rx timer stage</summary> + /// <param name="observer">the observer</param> + /// <param name="initialDelay">the initial delay in milli-seconds</param> + /// <param name="period">the period in milli-seconds</param> + public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period) + { + _observer = observer; + _timer = new Timer(period); + _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value); + _timer.Enabled = true; + } + + /// <summary> + /// Closes the stage + /// </summary> + public void Dispose() + { + _timer.Stop(); + } + + private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value) + { + observer.OnNext(value); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs new file mode 100644 index 0000000..418dc98 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX.Impl +{ + /// <summary>A Subject that relays all messages to its subscribers.</summary> + public class SimpleSubject<T> : ISubject<T, T> + { + private readonly IObserver<T> _observer; + + /// <summary>Constructs a simple subject</summary> + /// <param name="observer">the observer</param> + public SimpleSubject(IObserver<T> observer) + { + _observer = observer; + } + + /// <summary>Provides the observer with the new value</summary> + /// <param name="value">the new value</param> + public virtual void OnNext(T value) + { + _observer.OnNext(value); + } + + /// <summary>Provides the observer with the error</summary> + /// <param name="error">the error</param> + public virtual void OnError(Exception error) + { + _observer.OnError(error); + } + + /// <summary> + /// Provides the observer with it has finished sending push-based + /// notifications. + /// </summary> + public virtual void OnCompleted() + { + _observer.OnCompleted(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs new file mode 100644 index 0000000..b562055 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.RX +{ + /// <summary> + /// It is illegal to call onError() or onCompleted() when a call to onNext() is + /// still outstanding, or to call onNext(), onError() or onCompleted() after a + /// call to onError() or onCompleted() has been dispatched. + /// </summary> + /// <remarks> + /// It is illegal to call onError() or onCompleted() when a call to onNext() is + /// still outstanding, or to call onNext(), onError() or onCompleted() after a + /// call to onError() or onCompleted() has been dispatched. Observers may throw + /// an ObserverCompleted exception whenever this API is violated. Violating the + /// API leaves the Observer (and any resources that it holds) in an undefined + /// state, and throwing ObserverCompleted exceptions is optional. + /// Callers receiving this exception should simply pass it up the stack to the + /// Aura runtime. They should not attempt to forward it on to upstream or + /// downstream stages. The easiest way to do this is to ignore the exception + /// entirely. + /// </remarks> + [System.Serializable] + public class ObserverCompletedException : InvalidOperationException + { + private const long serialVersionUID = 1L; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs new file mode 100644 index 0000000..25f0bce --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface ICodec + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The codec type</typeparam> + public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs new file mode 100644 index 0000000..6ba2805 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Wake.Remote.Impl; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote +{ + [DefaultImplementation(typeof(ByteCodecFactory))] + public interface ICodecFactory + { + object Create(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs new file mode 100644 index 0000000..ddc72c8 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface IDecoder + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The decoder type</typeparam> + public interface IDecoder<T> : IDecoder + { + /// <summary>Decodes the given byte array into an object</summary> + /// <param name="data"></param> + /// <returns>the decoded object</returns> + T Decode(byte[] data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs new file mode 100644 index 0000000..b2a743f --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface IEncoder + { + } + + /// <summary> + /// Interface for serialization routines that translate back and forth between + /// byte arrays with low latency. + /// </summary> + /// <typeparam name="T">The encoder type</typeparam> + public interface IEncoder<T> : IEncoder + { + /// <summary>Encodes the given object into a Byte Array</summary> + /// <param name="obj"></param> + /// <returns>a byte[] representation of the object</returns> + byte[] Encode(T obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ILink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ILink.cs b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs new file mode 100644 index 0000000..b25ef4f --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Wake.Remote.Impl; + +namespace Org.Apache.Reef.Wake.Remote +{ + /// <summary> + /// Represents a link between two endpoints + /// </summary> + public interface ILink<T> : IDisposable + { + /// <summary> + /// Returns the local socket address + /// </summary> + IPEndPoint LocalEndpoint { get; } + + /// <summary> + /// Returns the remote socket address + /// </summary> + IPEndPoint RemoteEndpoint { get; } + + /// <summary> + /// Writes the value to this link asynchronously + /// </summary> + /// <param name="value">The data to write</param> + /// <param name="token">The cancellation token</param> + Task WriteAsync(T value, CancellationToken token); + + /// <summary> + /// Writes the value to this link synchronously + /// </summary> + /// <param name="value">The data to write</param> + void Write(T value); + + /// <summary> + /// Reads the value from this link asynchronously + /// </summary> + /// <returns>The read data</returns> + /// <param name="token">The cancellation token</param> + Task<T> ReadAsync(CancellationToken token); + + /// <summary> + /// Reads the value from this link synchronously + /// </summary> + /// <returns>The read data</returns> + T Read(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs new file mode 100644 index 0000000..d693401 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface IRemoteEvent<T> + { + IPEndPoint LocalEndPoint { get; set; } + + IPEndPoint RemoteEndPoint { get; set; } + + string Source { get; } + + string Sink { get; } + + T Value { get; } + + long Sequence { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs new file mode 100644 index 0000000..1101774 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + /// <summary> + /// An identifier that represents a remote source + /// </summary> + public abstract class IRemoteIdentifier : IIdentifier + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs new file mode 100644 index 0000000..fdea1e4 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + /// <summary>Factory that creates a RemoteIdentifier</summary> + public interface IRemoteIdentifierFactory : IIdentifierFactory + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs new file mode 100644 index 0000000..a572b04 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using Org.Apache.Reef.Wake.Remote.Impl; + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface IRemoteManager<T> : IStage + { + IRemoteIdentifier Identifier { get; } + + IPEndPoint LocalEndpoint { get; } + + IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest); + + IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint); + + IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver); + + IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver); + + IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs new file mode 100644 index 0000000..4b3d2a3 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + /// <summary> + /// Message received from a remote handler + /// </summary> + public interface IRemoteMessage<T> + { + /// <summary> + /// Returns a remote identifier of the sender + /// </summary> + /// <returns>The remote identifier</returns> + IRemoteIdentifier Identifier { get; } + + /// <summary> + /// Returns an actual message + /// </summary> + /// <returns>The remote message</returns> + T Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs new file mode 100644 index 0000000..8d859e2 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote +{ + public interface ISubscriptionManager + { + void Unsubscribe(object token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs new file mode 100644 index 0000000..e596ab7 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class ByteCodec : ICodec<byte[]> + { + [Inject] + public ByteCodec() + { + } + + public byte[] Encode(byte[] obj) + { + return obj; + } + + public byte[] Decode(byte[] data) + { + return data; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs new file mode 100644 index 0000000..333f341 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class ByteCodecFactory : ICodecFactory + { + [Inject] + public ByteCodecFactory() + { + } + + public object Create() + { + return new ByteCodec(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs new file mode 100644 index 0000000..184da8a --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Performs low level network IO operations between hosts + /// </summary> + public class Channel + { + private NetworkStream _stream; + + /// <summary> + /// Constructs a new Channel with the the connected NetworkStream. + /// </summary> + /// <param name="stream">The connected stream</param> + public Channel(NetworkStream stream) + { + if (stream == null) + { + throw new ArgumentNullException("stream"); + } + + _stream = stream; + } + + /// <summary> + /// Sends a message to the connected client synchronously + /// </summary> + /// <param name="message">The message to send</param> + public void Write(byte[] message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + byte[] messageBuffer = GenerateMessageBuffer(message); + _stream.Write(messageBuffer, 0, messageBuffer.Length); + } + + /// <summary> + /// Sends a message to the connected client asynchronously + /// </summary> + /// <param name="message">The message to send</param> + /// <param name="token">The cancellation token</param> + /// <returns>The awaitable write task</returns> + public async Task WriteAsync(byte[] message, CancellationToken token) + { + byte[] messageBuffer = GenerateMessageBuffer(message); + await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token); + } + + /// <summary> + /// Reads an incoming message as a byte array synchronously. + /// The message length is read as the first four bytes. + /// </summary> + /// <returns>The byte array message</returns> + public byte[] Read() + { + int payloadLength = ReadMessageLength(); + if (payloadLength == 0) + { + return null; + } + + return ReadBytes(payloadLength); + } + + /// <summary> + /// Reads an incoming message as a byte array asynchronously. + /// The message length is read as the first four bytes. + /// </summary> + /// <param name="token">The cancellation token</param> + /// <returns>The byte array message</returns> + public async Task<byte[]> ReadAsync(CancellationToken token) + { + int payloadLength = await GetMessageLengthAsync(token); + if (payloadLength == 0) + { + return null; + } + + return await ReadBytesAsync(payloadLength, token); + } + + /// <summary> + /// Helper method to read the specified number of bytes from the network stream. + /// </summary> + /// <param name="bytesToRead">The number of bytes to read</param> + /// <returns>The byte[] read from the network stream with the requested + /// number of bytes, otherwise null if the operation failed. + /// </returns> + private byte[] ReadBytes(int bytesToRead) + { + int totalBytesRead = 0; + byte[] buffer = new byte[bytesToRead]; + + while (totalBytesRead < bytesToRead) + { + int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead); + if (bytesRead == 0) + { + // Read timed out or connection was closed + return null; + } + + totalBytesRead += bytesRead; + } + + return buffer; + } + + /// <summary> + /// Helper method to read the specified number of bytes from the network stream. + /// </summary> + /// <param name="bytesToRead">The number of bytes to read</param> + /// <param name="token">The cancellation token</param> + /// <returns>The byte[] read from the network stream with the requested + /// number of bytes, otherwise null if the operation failed. + /// </returns> + private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token) + { + int bytesRead = 0; + byte[] buffer = new byte[bytesToRead]; + + while (bytesRead < bytesToRead) + { + int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token); + if (amountRead == 0) + { + // Read timed out or connection was closed + return null; + } + + bytesRead += amountRead; + } + + return buffer; + } + + /// <summary> + /// Generates the payload buffer containing the message along + /// with a header indicating the message length. + /// </summary> + /// <param name="message">The message to send</param> + /// <returns>The payload buffer</returns> + private byte[] GenerateMessageBuffer(byte[] message) + { + byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4); + byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lengthBuffer1); + } + + int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length; + byte[] messageBuffer = new byte[len]; + + int bytesCopied = 0; + bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0); + bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied); + CopyBytes(message, messageBuffer, bytesCopied); + + return messageBuffer; + } + + /// <summary> + /// Reads the first four bytes from the stream and decode + /// it to get the message length in bytes + /// </summary> + /// <returns>The incoming message's length in bytes</returns> + private int ReadMessageLength() + { + byte[] lenBytes = ReadBytes(sizeof(int)); + if (lenBytes == null) + { + return 0; + } + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lenBytes); + } + if (BitConverter.ToInt32(lenBytes, 0) == 0) + { + return 0; + } + + byte[] msgLength = ReadBytes(sizeof(int)); + return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); + } + + /// <summary> + /// Reads the first four bytes from the stream and decode + /// it to get the message length in bytes + /// </summary> + /// <param name="token">The cancellation token</param> + /// <returns>The incoming message's length in bytes</returns> + private async Task<int> GetMessageLengthAsync(CancellationToken token) + { + byte[] lenBytes = await ReadBytesAsync(sizeof(int), token); + if (lenBytes == null) + { + return 0; + } + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lenBytes); + } + if (BitConverter.ToInt32(lenBytes, 0) == 0) + { + return 0; + } + + byte[] msgLength = ReadBytes(sizeof(int)); + return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); + } + + /// <summary> + /// Copies the entire source buffer into the destination buffer the specified + /// destination offset. + /// </summary> + /// <param name="source">The source buffer to be copied</param> + /// <param name="dest">The destination buffer to copy to</param> + /// <param name="destOffset">The offset at the destination buffer to begin + /// copying.</param> + /// <returns>The number of bytes copied</returns> + private int CopyBytes(byte[] source, byte[] dest, int destOffset) + { + Buffer.BlockCopy(source, 0, dest, destOffset, source.Length); + return source.Length; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs new file mode 100644 index 0000000..2bba3c8 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Util; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Reactive; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Manages incoming and outgoing messages between remote hosts. + /// </summary> + public class DefaultRemoteManager<T> : IRemoteManager<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>)); + + private ObserverContainer<T> _observerContainer; + private TransportServer<IRemoteEvent<T>> _server; + private Dictionary<IPEndPoint, ProxyObserver> _cachedClients; + private ICodec<IRemoteEvent<T>> _codec; + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and any + /// available port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec) + { + } + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint. + /// </summary> + /// <param name="localEndpoint">The endpoint to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec) + { + if (localEndpoint == null) + { + throw new ArgumentNullException("localEndpoint"); + } + if (localEndpoint.Port < 0) + { + throw new ArgumentException("Listening port must be greater than or equal to zero"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _codec = new RemoteEventCodec<T>(codec); + _observerContainer = new ObserverContainer<T>(); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + // Begin to listen for incoming messages + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and any + /// available port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="port">The port to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec) + { + if (localAddress == null) + { + throw new ArgumentNullException("localAddress"); + } + if (port < 0) + { + throw new ArgumentException("Listening port must be greater than or equal to zero"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _observerContainer = new ObserverContainer<T>(); + _codec = new RemoteEventCodec<T>(codec); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); + + // Begin to listen for incoming messages + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. + /// </summary> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(ICodec<T> codec) + { + using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager")) + { + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _observerContainer = new ObserverContainer<T>(); + _codec = new RemoteEventCodec<T>(codec); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + } + + /// <summary> + /// Gets the RemoteIdentifier for the DefaultRemoteManager + /// </summary> + public IRemoteIdentifier Identifier { get; private set; } + + /// <summary> + /// Gets the local IPEndPoint for the DefaultRemoteManager + /// </summary> + public IPEndPoint LocalEndpoint { get; private set; } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetRemoteObserver(id.Addr); + } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver; + if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) + { + TransportClient<IRemoteEvent<T>> client = + new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer); + + remoteObserver = new ProxyObserver(client); + _cachedClients[remoteEndpoint] = remoteObserver; + } + + return remoteObserver; + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return RegisterObserver(id.Addr, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(remoteEndpoint, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(observer); + } + + /// <summary> + /// Release all resources for the DefaultRemoteManager. + /// </summary> + public void Dispose() + { + foreach (ProxyObserver cachedClient in _cachedClients.Values) + { + cachedClient.Dispose(); + } + + if (_server != null) + { + _server.Dispose(); + } + } + + /// <summary> + /// Observer to send messages to connected remote host + /// </summary> + private class ProxyObserver : IObserver<T>, IDisposable + { + private TransportClient<IRemoteEvent<T>> _client; + private int _messageCount; + + /// <summary> + /// Create new ProxyObserver + /// </summary> + /// <param name="client">The connected transport client used to send + /// messages to remote host</param> + public ProxyObserver(TransportClient<IRemoteEvent<T>> client) + { + _client = client; + _messageCount = 0; + } + + /// <summary> + /// Send the message to the remote host + /// </summary> + /// <param name="message">The message to send</param> + public void OnNext(T message) + { + IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message) + { + Sink = "default", + Sequence = _messageCount + }; + + _messageCount++; + _client.Send(remoteEvent); + } + + /// <summary> + /// Close underlying transport client + /// </summary> + public void Dispose() + { + _client.Dispose(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs new file mode 100644 index 0000000..5b24276 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + class DefaultRemoteMessage<T> : IRemoteMessage<T> + { + public DefaultRemoteMessage(IRemoteIdentifier id, T message) + { + Identifier = id; + Message = message; + } + + public IRemoteIdentifier Identifier { get; private set; } + + public T Message { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs new file mode 100644 index 0000000..8d4b47d --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Class to compare two IPEndPoint objects. + /// </summary> + internal class IPEndPointComparer : IEqualityComparer<IPEndPoint> + { + public bool Equals(IPEndPoint x, IPEndPoint y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + if (x == null || y == null) + { + return false; + } + + // If either port is 0, don't check port + if (x.Port == 0 || y.Port == 0) + { + return x.Address.Equals(y.Address); + } + + return x.Equals(y); + } + + public int GetHashCode(IPEndPoint obj) + { + return obj.Address.GetHashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs new file mode 100644 index 0000000..e413023 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using System; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class IntCodec : ICodec<int> + { + [Inject] + public IntCodec() + { + } + + public byte[] Encode(int obj) + { + return BitConverter.GetBytes(obj); + } + + public int Decode(byte[] data) + { + return BitConverter.ToInt32(data, 0); + } + } +}
