http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs new file mode 100644 index 0000000..d5b987a --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Exceptions; +using Org.Apache.Reef.Wake.Util; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Represents an open connection between remote hosts + /// </summary> + public class Link<T> : ILink<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>)); + + private IPEndPoint _localEndpoint; + private ICodec<T> _codec; + private Channel _channel; + private bool _disposed; + + /// <summary> + /// Constructs a Link object. + /// Connects to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The remote endpoint to connect to</param> + /// <param name="codec">The codec for serializing messages</param> + public Link(IPEndPoint remoteEndpoint, ICodec<T> codec) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + Client = new TcpClient(); + Client.Connect(remoteEndpoint); + + _codec = codec; + _channel = new Channel(Client.GetStream()); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + } + + /// <summary> + /// Constructs a Link object. + /// Uses the already connected TcpClient. + /// </summary> + /// <param name="client">The already connected client</param> + /// <param name="codec">The encoder and decoder</param> + public Link(TcpClient client, ICodec<T> codec) + { + if (client == null) + { + throw new ArgumentNullException("client"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + Client = client; + _codec = codec; + _channel = new Channel(Client.GetStream()); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + } + + /// <summary> + /// Returns the local socket address + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _localEndpoint; } + } + + /// <summary> + /// Returns the remote socket address + /// </summary> + public IPEndPoint RemoteEndpoint + { + get { return (IPEndPoint) Client.Client.RemoteEndPoint; } + } + + /// <summary> + /// Gets the underlying TcpClient + /// </summary> + public TcpClient Client { get; private set; } + + /// <summary> + /// Writes the message to the remote host + /// </summary> + /// <param name="value">The data to write</param> + public void Write(T value) + { + if (value == null) + { + throw new ArgumentNullException("value"); + } + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); + } + + byte[] message = _codec.Encode(value); + _channel.Write(message); + } + + /// <summary> + /// Writes the value to this link asynchronously + /// </summary> + /// <param name="value">The data to write</param> + /// <param name="token">The cancellation token</param> + public async Task WriteAsync(T value, CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); + } + + byte[] message = _codec.Encode(value); + await _channel.WriteAsync(message, token); + } + + /// <summary> + /// Reads the value from the link synchronously + /// </summary> + public T Read() + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); + } + + byte[] message = _channel.Read(); + return (message == null) ? default(T) : _codec.Decode(message); + } + + /// <summary> + /// Reads the value from the link asynchronously + /// </summary> + /// <param name="token">The cancellation token</param> + public async Task<T> ReadAsync(CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); + } + + byte[] message = await _channel.ReadAsync(token); + return (message == null) ? default(T) : _codec.Decode(message); + } + + /// <summary> + /// Close the client connection + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Subclasses of Links should overwrite this to handle disposing + /// of the link + /// </summary> + /// <param name="disposing">To dispose or not</param> + public virtual void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + try + { + Client.GetStream().Close(); + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket."); + } + + Client.Close(); + } + _disposed = true; + } + + /// <summary> + /// Overrides Equals. Two Link objects are equal if they are connected + /// to the same remote endpoint. + /// </summary> + /// <param name="obj">The object to compare</param> + /// <returns>True if the object is equal to this Link, otherwise false</returns> + public override bool Equals(object obj) + { + Link<T> other = obj as Link<T>; + if (other == null) + { + return false; + } + + return other.RemoteEndpoint.Equals(RemoteEndpoint); + } + + /// <summary> + /// Gets the hash code for the Link object. + /// </summary> + /// <returns>The object's hash code</returns> + public override int GetHashCode() + { + return RemoteEndpoint.GetHashCode(); + } + + /// <summary> + /// Discovers the IPEndpoint for the current machine. + /// </summary> + /// <returns>The local IPEndpoint</returns> + private IPEndPoint GetLocalEndpoint() + { + IPAddress address = NetworkUtils.LocalIPAddress; + int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port; + return new IPEndPoint(address, port); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs new file mode 100644 index 0000000..2791eb3 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using Org.Apache.Reef.Wake.Remote; +using Org.Apache.Reef.Wake.Remote.Impl; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Codec that can encode and decode a class depending on the class type. + /// </summary> + public class MultiCodec<T> : ICodec<T> + { + private readonly MultiEncoder<T> _encoder; + + private readonly MultiDecoder<T> _decoder; + + /// <summary> + /// Constructs a new MultiCodec object. + /// </summary> + public MultiCodec() + { + _encoder = new MultiEncoder<T>(); + _decoder = new MultiDecoder<T>(); + } + + /// <summary> + /// Register a codec to be used when encoding/decoding objects of this type. + /// </summary> + /// <typeparam name="U">The type of codec</typeparam> + /// <param name="codec">The codec to use when encoding/decoding + /// objects of this type</param> + public void Register<U>(ICodec<U> codec) where U : T + { + _encoder.Register(codec); + _decoder.Register(codec); + } + + /// <summary> + /// Register a codec to be used when encoding/decoding objects of this type. + /// </summary> + /// <typeparam name="U">The type of codec</typeparam> + /// <param name="codec">The codec to use when encoding/decoding + /// objects of this type</param> + /// <param name="name">The name of the class to encode/decode</param> + public void Register<U>(ICodec<U> codec, string name) where U : T + { + _encoder.Register(codec, name); + _decoder.Register(codec, name); + } + + /// <summary> + /// Encodes an object with the appropriate encoding or null if it cannot + /// be encoded. + /// </summary> + /// <param name="obj">Data to encode</param> + public byte[] Encode(T obj) + { + return _encoder.Encode(obj); + } + + /// <summary> + /// Decodes byte array into the appripriate object type. + /// </summary> + /// <param name="data">Data to be decoded</param> + public T Decode(byte[] data) + { + return _decoder.Decode(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs new file mode 100644 index 0000000..789e226 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Remote; +using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Decoder using the WakeTuple protocol buffer + /// (class name and bytes) + /// </summary> + public class MultiDecoder<T> : IDecoder<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>)); + private Dictionary<Type, object> _decoderMap; + private Dictionary<string, Type> _nameMap; + + /// <summary> + /// Constructs a decoder that decodes bytes based on the class type + /// </summary> + public MultiDecoder() + { + _decoderMap = new Dictionary<Type, object>(); + _nameMap = new Dictionary<string, Type>(); + } + + /// <summary> + /// Register the decoder for objects of type U + /// </summary> + /// <typeparam name="U">The type of decoder to use when decoding + /// objects of this type</typeparam> + /// <param name="decoder">The decoder to use when decoding + /// objects of this type</param> + public void Register<U>(IDecoder<U> decoder) where U : T + { + Type type = typeof(U); + _decoderMap[type] = decoder; + _nameMap[type.ToString()] = type; + } + + /// <summary> + /// Register the decoder for objects of type U + /// </summary> + /// <typeparam name="U">The type of decoder to use when decoding + /// objects of this type</typeparam> + /// <param name="decoder">The decoder to use when decoding + /// objects of this type</param> + /// <param name="name">The name of the class to decode</param> + public void Register<U>(IDecoder<U> decoder, string name) where U : T + { + Type type = typeof(U); + _decoderMap[type] = decoder; + _nameMap[name] = type; + } + + /// <summary> + /// Decodes byte array according to the underlying object type. + /// </summary> + /// <param name="data">The data to decode</param> + public T Decode(byte[] data) + { + WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data); + if (pbuf == null) + { + return default(T); + } + + // Get object's class Type + Type type; + if (!_nameMap.TryGetValue(pbuf.className, out type)) + { + return default(T); + } + + // Get decoder for that type + object decoder; + if (!_decoderMap.TryGetValue(type, out decoder)) + { + Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER); + } + + // Invoke the decoder to decode the byte array + Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type }); + MethodInfo info = handlerType.GetMethod("Decode"); + return (T) info.Invoke(decoder, new[] { (object) pbuf.data }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs new file mode 100644 index 0000000..cccf52f --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Encoder using the WakeTuple protocol buffer + /// (class name and bytes) + /// </summary> + public class MultiEncoder<T> : IEncoder<T> + { + private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>)); + private Dictionary<Type, object> _encoderMap; + private Dictionary<Type, string> _nameMap; + + /// <summary> + /// Constructs an encoder that encodes an object to bytes based on the class name + /// </summary> + public MultiEncoder() + { + _encoderMap = new Dictionary<Type, object>(); + _nameMap = new Dictionary<Type, string>(); + } + + public void Register<U>(IEncoder<U> encoder) where U : T + { + _encoderMap[typeof(U)] = encoder; + _nameMap[typeof(U)] = typeof(U).ToString(); + } + + public void Register<U>(IEncoder<U> encoder, string name) where U : T + { + _encoderMap[typeof(U)] = encoder; + _nameMap[typeof(U)] = name; + _logger.Log(Level.Verbose, "Registering name for " + name); + } + + /// <summary>Encodes an object to a byte array</summary> + /// <param name="obj"></param> + public byte[] Encode(T obj) + { + // Find encoder for object type + object encoder; + if (!_encoderMap.TryGetValue(obj.GetType(), out encoder)) + { + return null; + } + + // Invoke encoder for this type + Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() }); + MethodInfo info = handlerType.GetMethod("Encode"); + byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj }); + + // Serialize object type and object data into well known tuple + // To decode, deserialize the tuple, get object type, and look up the + // decoder for that type + string name = _nameMap[obj.GetType()]; + _logger.Log(Level.Verbose, "Encoding name for " + name); + WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data }; + pbuf.className = name; + pbuf.data = data; + return pbuf.Serialize(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs new file mode 100644 index 0000000..577cd95 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.RX.Impl; +using Org.Apache.Reef.Wake.Util; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Stores registered IObservers for DefaultRemoteManager. + /// Can register and look up IObservers by remote IPEndPoint. + /// </summary> + internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>> + { + private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; + private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; + private IObserver<T> _universalObserver; + + /// <summary> + /// Constructs a new ObserverContainer used to manage remote IObservers. + /// </summary> + public ObserverContainer() + { + _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); + _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint.Address.Equals(IPAddress.Any)) + { + _universalObserver = observer; + return Disposable.Create(() => { _universalObserver = null; }); + } + + _endpointMap[remoteEndpoint] = observer; + return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer)); + } + + /// <summary> + /// Registers an IObserver to handle incoming messages from a remote host + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + _typeMap[typeof(T)] = observer; + return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); + } + + /// <summary> + /// Look up the IObserver for the registered IPEndPoint or event type + /// and execute the IObserver. + /// </summary> + /// <param name="transportEvent">The incoming remote event</param> + public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent) + { + IRemoteEvent<T> remoteEvent = transportEvent.Data; + remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint; + remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint; + T value = remoteEvent.Value; + bool handled = false; + + IObserver<T> observer1; + IObserver<IRemoteMessage<T>> observer2; + if (_universalObserver != null) + { + _universalObserver.OnNext(value); + handled = true; + } + if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) + { + // IObserver was registered by IPEndpoint + observer1.OnNext(value); + handled = true; + } + else if (_typeMap.TryGetValue(value.GetType(), out observer2)) + { + // IObserver was registered by event type + IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); + IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); + observer2.OnNext(remoteMessage); + handled = true; + } + + if (!handled) + { + throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message"); + } + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs new file mode 100644 index 0000000..9e2fe2a --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Net; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class RemoteEvent<T> : IRemoteEvent<T> + { + public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value) + { + LocalEndPoint = localEndPoint; + RemoteEndPoint = remoteEndPoint; + Source = source; + Sink = sink; + Value = value; + Sequence = seq; + } + + public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value) + { + LocalEndPoint = localEndpoint; + RemoteEndPoint = remoteEndpoint; + Value = value; + } + + public RemoteEvent() + { + } + + public IPEndPoint LocalEndPoint { get; set; } + + public IPEndPoint RemoteEndPoint { get; set; } + + public string Source { get; set; } + + public string Sink { get; set; } + + public T Value { get; set; } + + public long Sequence { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs new file mode 100644 index 0000000..2c5b16d --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>> + { + private readonly RemoteEventEncoder<T> _encoder; + private readonly RemoteEventDecoder<T> _decoder; + + public RemoteEventCodec(ICodec<T> codec) + { + _encoder = new RemoteEventEncoder<T>(codec); + _decoder = new RemoteEventDecoder<T>(codec); + } + + public byte[] Encode(IRemoteEvent<T> obj) + { + return _encoder.Encode(obj); + } + + public IRemoteEvent<T> Decode(byte[] data) + { + return _decoder.Decode(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs new file mode 100644 index 0000000..f9cfbc1 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>> + { + private IDecoder<T> _decoder; + + public RemoteEventDecoder(IDecoder<T> decoder) + { + _decoder = decoder; + } + + public IRemoteEvent<T> Decode(byte[] data) + { + WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data); + return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs new file mode 100644 index 0000000..432e688 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>> + { + private readonly IEncoder<T> _encoder; + + public RemoteEventEncoder(IEncoder<T> encoder) + { + _encoder = encoder; + } + + public byte[] Encode(IRemoteEvent<T> obj) + { + WakeMessagePBuf pbuf = new WakeMessagePBuf(); + pbuf.sink = obj.Sink; + pbuf.source = obj.Source; + pbuf.data = _encoder.Encode(obj.Value); + pbuf.seq = obj.Sequence; + return pbuf.Serialize(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs new file mode 100644 index 0000000..10a048e --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class RemoteEventEndPoint<T> + { + private IRemoteIdentifier _id; + + public RemoteEventEndPoint(IRemoteIdentifier id) + { + _id = id; + } + + public IRemoteIdentifier Id + { + get { return _id; } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs new file mode 100644 index 0000000..2f402a8 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using System.Globalization; +using System.Net; +using System.Text; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Remote identifier based on a socket address + /// </summary> + public class SocketRemoteIdentifier : IRemoteIdentifier + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier)); + private IPEndPoint _addr; + + public SocketRemoteIdentifier(IPEndPoint addr) + { + _addr = addr; + } + + public SocketRemoteIdentifier(string str) + { + int index = str.IndexOf(":", System.StringComparison.Ordinal); + if (index <= 0) + { + Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER); + } + string host = str.Substring(0, index); + int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture); + _addr = new IPEndPoint(IPAddress.Parse(host), port); + } + + public IPEndPoint Addr + { + get { return _addr; } + } + + public override int GetHashCode() + { + return _addr.GetHashCode(); + } + + public override bool Equals(object obj) + { + return _addr.Equals(((SocketRemoteIdentifier)obj).Addr); + } + + public override string ToString() + { + StringBuilder builder = new StringBuilder(); + builder.Append("socket://"); + builder.Append(_addr); + return builder.ToString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs new file mode 100644 index 0000000..4e25b18 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class StringCodec : ICodec<string> + { + [Inject] + public StringCodec() + { + } + + public byte[] Encode(string obj) + { + return Encoding.ASCII.GetBytes(obj); + } + + public string Decode(byte[] data) + { + return Encoding.ASCII.GetString(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs new file mode 100644 index 0000000..31829cd --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class StringIdentifier : IIdentifier + { + private readonly string _str; + + public StringIdentifier(string s) + { + _str = s; + } + + public override int GetHashCode() + { + return _str.GetHashCode(); + } + + public override bool Equals(object o) + { + StringIdentifier other = o as StringIdentifier; + return other != null && _str.Equals(other._str); + } + + public override string ToString() + { + return _str; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs new file mode 100644 index 0000000..b9bcb50 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class StringIdentifierFactory : IIdentifierFactory + { + [Inject] + public StringIdentifierFactory() + { + } + + public IIdentifier Create(string s) + { + return new StringIdentifier(s); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs new file mode 100644 index 0000000..848a915 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Establish connections to TransportServer for remote message passing + /// </summary> + public class TransportClient<T> : IDisposable + { + private ILink<T> _link; + private IObserver<TransportEvent<T>> _observer; + private CancellationTokenSource _cancellationSource; + private bool _disposed; + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="codec">Codec to decode/encodec</param> + public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _link = new Link<T>(remoteEndpoint, codec); + _cancellationSource = new CancellationTokenSource(); + _disposed = false; + } + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="codec">Codec to decode/encodec</param> + /// <param name="observer">Callback used when receiving responses from remote host</param> + public TransportClient(IPEndPoint remoteEndpoint, + ICodec<T> codec, + IObserver<TransportEvent<T>> observer) + : this(remoteEndpoint, codec) + { + _observer = observer; + Task.Run(() => ResponseLoop()); + } + + /// <summary> + /// Gets the underlying transport link. + /// </summary> + public ILink<T> Link + { + get { return _link; } + } + + /// <summary> + /// Send the remote message. + /// </summary> + /// <param name="message">The message to send</param> + public void Send(T message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + _link.Write(message); + } + + /// <summary> + /// Close all opened connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _link.Dispose(); + _disposed = true; + } + } + + /// <summary> + /// Continually read responses from remote host + /// </summary> + private async Task ResponseLoop() + { + while (!_cancellationSource.IsCancellationRequested) + { + T message = await _link.ReadAsync(_cancellationSource.Token); + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); + _observer.OnNext(transportEvent); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs new file mode 100644 index 0000000..6c4644c --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + public class TransportEvent<T> + { + public TransportEvent(T data, ILink<T> link) + { + Data = data; + Link = link; + } + + public T Data { get; private set; } + + public ILink<T> Link { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs new file mode 100644 index 0000000..20cdc8a --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using System; +using System.Globalization; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Wake.Util; + +namespace Org.Apache.Reef.Wake.Remote.Impl +{ + /// <summary> + /// Server to handle incoming remote messages. + /// </summary> + public class TransportServer<T> : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>)); + + private TcpListener _listener; + private CancellationTokenSource _cancellationSource; + private IObserver<TransportEvent<T>> _remoteObserver; + private ICodec<T> _codec; + private bool _disposed; + private Task _serverTask; + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="port">Port to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + /// <param name="codec">The codec to encode/decode"</param> + public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec) + : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec) + { + } + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="localEndpoint">Endpoint to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + /// <param name="codec">The codec to encode/decode"</param> + public TransportServer(IPEndPoint localEndpoint, + IObserver<TransportEvent<T>> remoteHandler, + ICodec<T> codec) + { + _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); + _remoteObserver = remoteHandler; + _cancellationSource = new CancellationTokenSource(); + _cancellationSource.Token.ThrowIfCancellationRequested(); + _codec = codec; + _disposed = false; + } + + /// <summary> + /// Returns the listening endpoint for the TransportServer + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _listener.LocalEndpoint as IPEndPoint; } + } + + /// <summary> + /// Starts listening for incoming remote messages. + /// </summary> + public void Run() + { + _listener.Start(); + _serverTask = Task.Run(() => StartServer()); + } + + /// <summary> + /// Close the TransportServer and all open connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _cancellationSource.Cancel(); + try + { + _listener.Stop(); + } + catch (SocketException) + { + LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); + } + + if (_serverTask != null) + { + _serverTask.Wait(); + + // Give the TransportServer Task 500ms to shut down, ignore any timeout errors + try + { + CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); + _serverTask.Wait(serverDisposeTimeout.Token); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + finally + { + _serverTask.Dispose(); + } + } + } + + _disposed = true; + } + + /// <summary> + /// Helper method to start TransportServer. This will + /// be run in an asynchronous Task. + /// </summary> + /// <returns>An asynchronous Task for the running server.</returns> + private async Task StartServer() + { + try + { + while (!_cancellationSource.Token.IsCancellationRequested) + { + TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); + ProcessClient(client).Forget(); + } + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + catch (OperationCanceledException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + } + + /// <summary> + /// Recieves event from connected TcpClient and invokes handler on the event. + /// </summary> + /// <param name="client">The connected client</param> + private async Task ProcessClient(TcpClient client) + { + // Keep reading messages from client until they disconnect or timeout + CancellationToken token = _cancellationSource.Token; + using (ILink<T> link = new Link<T>(client, _codec)) + { + while (!token.IsCancellationRequested) + { + T message = await link.ReadAsync(token); + TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); + + _remoteObserver.OnNext(transportEvent); + + if (message == null) + { + break; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs new file mode 100644 index 0000000..247fff9 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using ProtoBuf; +using System; +using System.IO; +using System.Runtime.Serialization; + +namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos +{ + /// <summary> + /// Message p buff + /// </summary> + public partial class WakeMessagePBuf + { + public static WakeMessagePBuf Deserialize(byte[] bytes) + { + WakeMessagePBuf pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<WakeMessagePBuf>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, this); + return s.ToArray(); + } + } + } + + /// <summary> + /// Wake tuple buf + /// </summary> + public partial class WakeTuplePBuf + { + public static WakeTuplePBuf Deserialize(byte[] bytes) + { + WakeTuplePBuf pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<WakeTuplePBuf>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, this); + return s.ToArray(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs new file mode 100644 index 0000000..633cc79 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Tang.Annotations; +using System; + +namespace Org.Apache.Reef.Wake.Remote +{ + public class RemoteConfiguration + { + [NamedParameter(shortName: "rm_name", documentation: "The name of the remote manager.")] + public class ManagerName : Name<string> + { + } + + [NamedParameter(shortName: "rm_host", documentation: "The host address to be used for messages.")] + public class HostAddress : Name<string> + { + } + + [NamedParameter(shortName: "rm_port", documentation: "The port to be used for messages.")] + public class Port : Name<int> + { + } + + [NamedParameter(documentation: "The codec to be used for messages.")] + public class MessageCodec : Name<ICodec<Type>> + { + } + + [NamedParameter(documentation: "The event handler to be used for exception")] + public class ErrorHandler : Name<IObserver<Exception>> + { + } + + [NamedParameter(shortName: "rm_order", documentation: "Whether or not to use the message ordering guarantee", defaultValue: "true")] + public class OrderingGuarantee : Name<bool> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs new file mode 100644 index 0000000..061e532 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.Remote +{ + /// <summary>Wake remote runtime exception</summary> + [System.Serializable] + public class RemoteRuntimeException : Exception + { + private const long serialVersionUID = 1L; + + /// <summary>Constructs a new runtime remote exception with the specified detail message and cause + /// </summary> + /// <param name="s">the detailed message</param> + /// <param name="e">the cause</param> + public RemoteRuntimeException(string s, Exception e) + : base(s, e) + { + } + + /// <summary>Constructs a new runtime remote exception with the specified detail message + /// </summary> + /// <param name="s">the detailed message</param> + public RemoteRuntimeException(string s) + : base(s) + { + } + + /// <summary>Constructs a new runtime remote exception with the specified cause</summary> + /// <param name="e">the cause</param> + public RemoteRuntimeException(Exception e) + : base("Runtime Exception", e) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs new file mode 100644 index 0000000..611b5a1 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.Time +{ + /// <summary> + /// Represents a timer event. + /// </summary> + public abstract class Alarm : Time + { + private IObserver<Alarm> _handler; + + public Alarm(long timestamp, IObserver<Alarm> handler) : base(timestamp) + { + _handler = handler; + } + + public void Handle() + { + _handler.OnNext(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs new file mode 100644 index 0000000..4cea1bc --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Time +{ + /// <summary> + /// Represents the Time at which a component started. + /// </summary> + public class StartTime : Time + { + public StartTime(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs new file mode 100644 index 0000000..8e3bf65 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Time +{ + /// <summary> + /// Represents the Time at which a component stops. + /// </summary> + public class StopTime : Time + { + public StopTime(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/IClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/IClock.cs b/lang/cs/Source/WAKE/Wake/Time/IClock.cs new file mode 100644 index 0000000..6d906b8 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/IClock.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; + +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Wake.Impl; +using Org.Apache.Reef.Wake.Time.Runtime.Event; + +namespace Org.Apache.Reef.Wake.Time +{ + public abstract class IClock : IDisposable + { + /// <summary> + /// Schedule a TimerEvent at the given future offset + /// </summary> + /// <param name="offset">The offset in the future to schedule the alarm</param> + /// <param name="handler">The IObserver to to be called</param> + public abstract void ScheduleAlarm(long offset, IObserver<Alarm> handler); + + /// <summary> + /// Clock is idle if it has no future alarms set + /// </summary> + /// <returns>True if no future alarms are set, otherwise false</returns> + public abstract bool IsIdle(); + + /// <summary> + /// Dispose of the clock and all scheduled alarms + /// </summary> + public abstract void Dispose(); + + /// <summary> + /// Bind this to an event handler to statically subscribe to the StartTime Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the start even", defaultClass: typeof(MissingStartHandlerHandler))] + public class StartHandler : Name<ISet<IObserver<StartTime>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the StopTime Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the stop event", defaultClass: typeof(LoggingEventHandler<StopTime>))] + public class StopHandler : Name<ISet<IObserver<StopTime>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the RuntimeStart Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the runtime start event", defaultClass: typeof(LoggingEventHandler<RuntimeStart>))] + public class RuntimeStartHandler : Name<ISet<IObserver<RuntimeStart>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the RuntimeStop Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the runtime stop event", defaultClass: typeof(LoggingEventHandler<RuntimeStop>))] + public class RuntimeStopHandler : Name<ISet<IObserver<RuntimeStop>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the IdleClock Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the Idle event", defaultClass: typeof(LoggingEventHandler<IdleClock>))] + public class IdleHandler : Name<ISet<IObserver<IdleClock>>> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs new file mode 100644 index 0000000..ac95896 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class ClientAlarm : Alarm + { + public ClientAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs new file mode 100644 index 0000000..3ca1fd8 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class IdleClock : Time + { + public IdleClock(long timestamp) : base(timestamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs new file mode 100644 index 0000000..701bf88 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class RuntimeAlarm : Alarm + { + public RuntimeAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs new file mode 100644 index 0000000..d08cccc --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class RuntimeStart : Time + { + public RuntimeStart(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs new file mode 100644 index 0000000..6324e20 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class RuntimeStop : Time + { + public RuntimeStop(long timestamp) : this(timestamp, null) + { + } + + public RuntimeStop(long timestamp, Exception e) : base(timestamp) + { + Exception = e; + } + + public Exception Exception { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs new file mode 100644 index 0000000..c932aa4 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.Reef.Tang.Annotations; + +namespace Org.Apache.Reef.Wake.Time.Runtime +{ + [DefaultImplementation(typeof(RealTimer))] + public interface ITimer + { + /// <summary> + /// Gets the current time + /// </summary> + long CurrentTime { get; } + + /// <summary> + /// Gets the difference between the given time and the current time + /// </summary> + /// <param name="time">The time to compare against the current time</param> + long GetDuration(long time); + + /// <summary> + /// Checks if the given time has already passed. + /// </summary> + /// <param name="time">The time to check if it has passed or not</param> + bool IsReady(long time); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs new file mode 100644 index 0000000..f4b6c75 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Wake.Time.Runtime.Event; + +namespace Org.Apache.Reef.Wake.Time.Runtime +{ + /// <summary> + /// LogicalTimer class used for testing purposes. + /// </summary> + public class LogicalTimer : ITimer + { + [Inject] + public LogicalTimer() + { + } + + public long CurrentTime + { + get { return 0; } + } + + public long GetDuration(long time) + { + return 0; + } + + public bool IsReady(long time) + { + return true; + } + } +}
