http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs new file mode 100644 index 0000000..85d6359 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Establish connections to TransportServer for remote message passing + /// </summary> + public class TransportClient<T> : IDisposable + { + private ILink<T> _link; + private IObserver<TransportEvent<T>> _observer; + private CancellationTokenSource _cancellationSource; + private bool _disposed; + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="codec">Codec to decode/encodec</param> + public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _link = new Link<T>(remoteEndpoint, codec); + _cancellationSource = new CancellationTokenSource(); + _disposed = false; + } + + /// <summary> + /// Construct a TransportClient. + /// Used to send messages to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> + /// <param name="codec">Codec to decode/encodec</param> + /// <param name="observer">Callback used when receiving responses from remote host</param> + public TransportClient(IPEndPoint remoteEndpoint, + ICodec<T> codec, + IObserver<TransportEvent<T>> observer) + : this(remoteEndpoint, codec) + { + _observer = observer; + Task.Run(() => ResponseLoop()); + } + + /// <summary> + /// Gets the underlying transport link. + /// </summary> + public ILink<T> Link + { + get { return _link; } + } + + /// <summary> + /// Send the remote message. + /// </summary> + /// <param name="message">The message to send</param> + public void Send(T message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + _link.Write(message); + } + + /// <summary> + /// Close all opened connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _link.Dispose(); + _disposed = true; + } + } + + /// <summary> + /// Continually read responses from remote host + /// </summary> + private async Task ResponseLoop() + { + while (!_cancellationSource.IsCancellationRequested) + { + T message = await _link.ReadAsync(_cancellationSource.Token); + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); + _observer.OnNext(transportEvent); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs new file mode 100644 index 0000000..0e1eff7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class TransportEvent<T> + { + public TransportEvent(T data, ILink<T> link) + { + Data = data; + Link = link; + } + + public T Data { get; private set; } + + public ILink<T> Link { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs new file mode 100644 index 0000000..c953789 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Globalization; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Server to handle incoming remote messages. + /// </summary> + public class TransportServer<T> : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>)); + + private TcpListener _listener; + private CancellationTokenSource _cancellationSource; + private IObserver<TransportEvent<T>> _remoteObserver; + private ICodec<T> _codec; + private bool _disposed; + private Task _serverTask; + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="port">Port to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + /// <param name="codec">The codec to encode/decode"</param> + public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec) + : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec) + { + } + + /// <summary> + /// Constructs a TransportServer to listen for remote events. + /// Listens on the specified remote endpoint. When it recieves a remote + /// event, it will envoke the specified remote handler. + /// </summary> + /// <param name="localEndpoint">Endpoint to listen on</param> + /// <param name="remoteHandler">The handler to invoke when receiving incoming + /// remote messages</param> + /// <param name="codec">The codec to encode/decode"</param> + public TransportServer(IPEndPoint localEndpoint, + IObserver<TransportEvent<T>> remoteHandler, + ICodec<T> codec) + { + _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); + _remoteObserver = remoteHandler; + _cancellationSource = new CancellationTokenSource(); + _cancellationSource.Token.ThrowIfCancellationRequested(); + _codec = codec; + _disposed = false; + } + + /// <summary> + /// Returns the listening endpoint for the TransportServer + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _listener.LocalEndpoint as IPEndPoint; } + } + + /// <summary> + /// Starts listening for incoming remote messages. + /// </summary> + public void Run() + { + _listener.Start(); + _serverTask = Task.Run(() => StartServer()); + } + + /// <summary> + /// Close the TransportServer and all open connections + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public void Dispose(bool disposing) + { + if (!_disposed && disposing) + { + _cancellationSource.Cancel(); + try + { + _listener.Stop(); + } + catch (SocketException) + { + LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); + } + + if (_serverTask != null) + { + _serverTask.Wait(); + + // Give the TransportServer Task 500ms to shut down, ignore any timeout errors + try + { + CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); + _serverTask.Wait(serverDisposeTimeout.Token); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + finally + { + _serverTask.Dispose(); + } + } + } + + _disposed = true; + } + + /// <summary> + /// Helper method to start TransportServer. This will + /// be run in an asynchronous Task. + /// </summary> + /// <returns>An asynchronous Task for the running server.</returns> + private async Task StartServer() + { + try + { + while (!_cancellationSource.Token.IsCancellationRequested) + { + TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); + ProcessClient(client).Forget(); + } + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + catch (OperationCanceledException) + { + LOGGER.Log(Level.Info, "TransportServer has been closed."); + } + } + + /// <summary> + /// Recieves event from connected TcpClient and invokes handler on the event. + /// </summary> + /// <param name="client">The connected client</param> + private async Task ProcessClient(TcpClient client) + { + // Keep reading messages from client until they disconnect or timeout + CancellationToken token = _cancellationSource.Token; + using (ILink<T> link = new Link<T>(client, _codec)) + { + while (!token.IsCancellationRequested) + { + T message = await link.ReadAsync(token); + TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); + + _remoteObserver.OnNext(transportEvent); + + if (message == null) + { + break; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs new file mode 100644 index 0000000..8731b1d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using ProtoBuf; +using System; +using System.IO; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos +{ + /// <summary> + /// Message p buff + /// </summary> + public partial class WakeMessagePBuf + { + public static WakeMessagePBuf Deserialize(byte[] bytes) + { + WakeMessagePBuf pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<WakeMessagePBuf>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, this); + return s.ToArray(); + } + } + } + + /// <summary> + /// Wake tuple buf + /// </summary> + public partial class WakeTuplePBuf + { + public static WakeTuplePBuf Deserialize(byte[] bytes) + { + WakeTuplePBuf pbuf = null; + using (var s = new MemoryStream(bytes)) + { + pbuf = Serializer.Deserialize<WakeTuplePBuf>(s); + } + return pbuf; + } + + public byte[] Serialize() + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, this); + return s.ToArray(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs new file mode 100644 index 0000000..655155f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using System; + +namespace Org.Apache.REEF.Wake.Remote +{ + public class RemoteConfiguration + { + [NamedParameter(shortName: "rm_name", documentation: "The name of the remote manager.")] + public class ManagerName : Name<string> + { + } + + [NamedParameter(shortName: "rm_host", documentation: "The host address to be used for messages.")] + public class HostAddress : Name<string> + { + } + + [NamedParameter(shortName: "rm_port", documentation: "The port to be used for messages.")] + public class Port : Name<int> + { + } + + [NamedParameter(documentation: "The codec to be used for messages.")] + public class MessageCodec : Name<ICodec<Type>> + { + } + + [NamedParameter(documentation: "The event handler to be used for exception")] + public class ErrorHandler : Name<IObserver<Exception>> + { + } + + [NamedParameter(shortName: "rm_order", documentation: "Whether or not to use the message ordering guarantee", defaultValue: "true")] + public class OrderingGuarantee : Name<bool> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs new file mode 100644 index 0000000..150e5c8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary>Wake remote runtime exception</summary> + [System.Serializable] + public class RemoteRuntimeException : Exception + { + private const long serialVersionUID = 1L; + + /// <summary>Constructs a new runtime remote exception with the specified detail message and cause + /// </summary> + /// <param name="s">the detailed message</param> + /// <param name="e">the cause</param> + public RemoteRuntimeException(string s, Exception e) + : base(s, e) + { + } + + /// <summary>Constructs a new runtime remote exception with the specified detail message + /// </summary> + /// <param name="s">the detailed message</param> + public RemoteRuntimeException(string s) + : base(s) + { + } + + /// <summary>Constructs a new runtime remote exception with the specified cause</summary> + /// <param name="e">the cause</param> + public RemoteRuntimeException(Exception e) + : base("Runtime Exception", e) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs new file mode 100644 index 0000000..bb50883 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.Time +{ + /// <summary> + /// Represents a timer event. + /// </summary> + public abstract class Alarm : Time + { + private IObserver<Alarm> _handler; + + public Alarm(long timestamp, IObserver<Alarm> handler) : base(timestamp) + { + _handler = handler; + } + + public void Handle() + { + _handler.OnNext(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs new file mode 100644 index 0000000..aeb54a6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Time +{ + /// <summary> + /// Represents the Time at which a component started. + /// </summary> + public class StartTime : Time + { + public StartTime(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs new file mode 100644 index 0000000..cb1f3fd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Time +{ + /// <summary> + /// Represents the Time at which a component stops. + /// </summary> + public class StopTime : Time + { + public StopTime(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs new file mode 100644 index 0000000..9bdbba7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Impl; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Wake.Time +{ + public abstract class IClock : IDisposable + { + /// <summary> + /// Schedule a TimerEvent at the given future offset + /// </summary> + /// <param name="offset">The offset in the future to schedule the alarm</param> + /// <param name="handler">The IObserver to to be called</param> + public abstract void ScheduleAlarm(long offset, IObserver<Alarm> handler); + + /// <summary> + /// Clock is idle if it has no future alarms set + /// </summary> + /// <returns>True if no future alarms are set, otherwise false</returns> + public abstract bool IsIdle(); + + /// <summary> + /// Dispose of the clock and all scheduled alarms + /// </summary> + public abstract void Dispose(); + + /// <summary> + /// Bind this to an event handler to statically subscribe to the StartTime Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the start even", defaultClass: typeof(MissingStartHandlerHandler))] + public class StartHandler : Name<ISet<IObserver<StartTime>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the StopTime Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the stop event", defaultClass: typeof(LoggingEventHandler<StopTime>))] + public class StopHandler : Name<ISet<IObserver<StopTime>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the RuntimeStart Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the runtime start event", defaultClass: typeof(LoggingEventHandler<RuntimeStart>))] + public class RuntimeStartHandler : Name<ISet<IObserver<RuntimeStart>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the RuntimeStop Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the runtime stop event", defaultClass: typeof(LoggingEventHandler<RuntimeStop>))] + public class RuntimeStopHandler : Name<ISet<IObserver<RuntimeStop>>> + { + } + + /// <summary> + /// Bind this to an event handler to statically subscribe to the IdleClock Event + /// </summary> + [NamedParameter(documentation: "Will be called upon the Idle event", defaultClass: typeof(LoggingEventHandler<IdleClock>))] + public class IdleHandler : Name<ISet<IObserver<IdleClock>>> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs new file mode 100644 index 0000000..ab2ce53 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class ClientAlarm : Alarm + { + public ClientAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs new file mode 100644 index 0000000..ff9872d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class IdleClock : Time + { + public IdleClock(long timestamp) : base(timestamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs new file mode 100644 index 0000000..1f228dc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class RuntimeAlarm : Alarm + { + public RuntimeAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs new file mode 100644 index 0000000..136d62d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class RuntimeStart : Time + { + public RuntimeStart(long timeStamp) : base(timeStamp) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs new file mode 100644 index 0000000..75cc41b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class RuntimeStop : Time + { + public RuntimeStop(long timestamp) : this(timestamp, null) + { + } + + public RuntimeStop(long timestamp, Exception e) : base(timestamp) + { + Exception = e; + } + + public Exception Exception { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs new file mode 100644 index 0000000..bfbbf05 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Time.Runtime +{ + [DefaultImplementation(typeof(RealTimer))] + public interface ITimer + { + /// <summary> + /// Gets the current time + /// </summary> + long CurrentTime { get; } + + /// <summary> + /// Gets the difference between the given time and the current time + /// </summary> + /// <param name="time">The time to compare against the current time</param> + long GetDuration(long time); + + /// <summary> + /// Checks if the given time has already passed. + /// </summary> + /// <param name="time">The time to check if it has passed or not</param> + bool IsReady(long time); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs new file mode 100644 index 0000000..b1cb543 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Wake.Time.Runtime +{ + /// <summary> + /// LogicalTimer class used for testing purposes. + /// </summary> + public class LogicalTimer : ITimer + { + [Inject] + public LogicalTimer() + { + } + + public long CurrentTime + { + get { return 0; } + } + + public long GetDuration(long time) + { + return 0; + } + + public bool IsReady(long time) + { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs new file mode 100644 index 0000000..6b5213b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Wake.Time.Runtime +{ + public class RealTimer : ITimer + { + [Inject] + public RealTimer() + { + } + + /// <summary> + /// Gets the number of milliseconds since Epoch + /// </summary> + public long CurrentTime + { + get { return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond; } + } + + /// <summary> + /// Gets the difference between the given time and the current time + /// </summary> + /// <param name="time">The time to compare against the current time</param> + public long GetDuration(long time) + { + return time - CurrentTime; + } + + /// <summary> + /// Checks if the given time has already passed. + /// </summary> + /// <param name="time">The time to check if it has passed or not</param> + public bool IsReady(long time) + { + return GetDuration(time) <= 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs new file mode 100644 index 0000000..0871521 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Wake.RX.Impl; +using Org.Apache.REEF.Tang.Implementations.InjectionPlan; + +namespace Org.Apache.REEF.Wake.Time.Runtime.Event +{ + public class RuntimeClock : IClock + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock)); + + private ITimer _timer; + private PubSubSubject<Time> _handlers; + private ISet<Time> _schedule; + + private IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler; + private IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler; + private IInjectionFuture<ISet<IObserver<RuntimeStart>>> _runtimeStartHandler; + private IInjectionFuture<ISet<IObserver<RuntimeStop>>> _runtimeStopHandler; + private IInjectionFuture<ISet<IObserver<IdleClock>>> _idleHandler; + + private bool _disposed; + + /// <summary> + /// Create a new RuntimeClock with injectable IObservers + /// </summary> + /// <param name="timer">The runtime clock timer</param> + /// <param name="startHandler">The start handler</param> + /// <param name="stopHandler">The stop handler</param> + /// <param name="runtimeStartHandler">The runtime start handler</param> + /// <param name="runtimeStopHandler">The runtime stop handler</param> + /// <param name="idleHandler">The idle handler</param> + [Inject] + internal RuntimeClock( + ITimer timer, + [Parameter(typeof(StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> startHandler, + [Parameter(typeof(StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> stopHandler, + [Parameter(typeof(RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> runtimeStartHandler, + [Parameter(typeof(RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> runtimeStopHandler, + [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler) + { + _timer = timer; + _schedule = new SortedSet<Time>(); + _handlers = new PubSubSubject<Time>(); + + _startHandler = startHandler; + _stopHandler = stopHandler; + _runtimeStartHandler = runtimeStartHandler; + _runtimeStopHandler = runtimeStopHandler; + _idleHandler = idleHandler; + } + + public IInjectionFuture<ISet<IObserver<RuntimeStart>>> InjectedRuntimeStartHandler + { + get { return _runtimeStartHandler; } + set { _runtimeStartHandler = value; } + } + + public IInjectionFuture<ISet<IObserver<RuntimeStop>>> InjectedRuntimeStopHandler + { + get { return _runtimeStopHandler; } + set { _runtimeStopHandler = value; } + } + + /// <summary> + /// Schedule a TimerEvent at the given future offset + /// </summary> + /// <param name="offset">The offset in the future to schedule the alarm</param> + /// <param name="handler">The IObserver to to be called</param> + public override void ScheduleAlarm(long offset, IObserver<Alarm> handler) + { + if (_disposed) + { + return; + } + if (handler == null) + { + Exceptions.Throw(new ArgumentNullException("handler"), LOGGER); + } + + lock (_schedule) + { + _schedule.Add(new ClientAlarm(_timer.CurrentTime + offset, handler)); + Monitor.PulseAll(_schedule); + } + } + + /// <summary> + /// Clock is idle if it has no future alarms set + /// </summary> + /// <returns>True if no future alarms are set, otherwise false</returns> + public override bool IsIdle() + { + lock (_schedule) + { + return _schedule.Count == 0; + } + } + + /// <summary> + /// Dispose of the clock and all scheduled alarms + /// </summary> + public override void Dispose() + { + lock (_schedule) + { + _schedule.Clear(); + _schedule.Add(new StopTime(_timer.CurrentTime)); + Monitor.PulseAll(_schedule); + _disposed = true; + } + } + + /// <summary> + /// Register the IObserver for the particular Time event. + /// </summary> + /// <param name="observer">The handler to register</param> + public void RegisterObserver<U>(IObserver<U> observer) where U : Time + { + if (_disposed) + { + return; + } + + _handlers.Subscribe(observer); + } + + /// <summary> + /// Start the RuntimeClock. + /// Clock will continue to run and handle events until it has been disposed. + /// </summary> + public void Run() + { + SubscribeHandlers(); + _handlers.OnNext(new RuntimeStart(_timer.CurrentTime)); + _handlers.OnNext(new StartTime(_timer.CurrentTime)); + + while (true) + { + lock (_schedule) + { + if (IsIdle()) + { + _handlers.OnNext(new IdleClock(_timer.CurrentTime)); + } + + // Blocks and releases lock until it receives the next event + Time alarm = GetNextEvent(); + ProcessEvent(alarm); + + if (alarm is StopTime) + { + break; + } + } + } + _handlers.OnNext(new RuntimeStop(_timer.CurrentTime)); + } + + /// <summary> + /// Register the event handlers + /// </summary> + private void SubscribeHandlers() + { + Subscribe(_startHandler.Get()); + Subscribe(_stopHandler.Get()); + Subscribe(_runtimeStartHandler.Get()); + Subscribe(_runtimeStopHandler.Get()); + Subscribe(_idleHandler.Get()); + } + + /// <summary> + /// Subscribe a set of IObservers for a particular Time event + /// </summary> + /// <param name="observers">The set of observers to subscribe</param> + private void Subscribe<U>(ISet<IObserver<U>> observers) where U : Time + { + foreach (IObserver<U> observer in observers) + { + _handlers.Subscribe(observer); + } + } + + /// <summary> + /// Wait until the first scheduled alarm is ready to be handled + /// Assumes that we have a lock on the _schedule SortedSet + /// </summary> + private Time GetNextEvent() + { + // Wait for an alarm to be scheduled on the condition variable Count + while (_schedule.Count == 0) + { + Monitor.Wait(_schedule); + } + + // Once the alarm is scheduled, wait for the prescribed amount of time. + // If a new alarm is scheduled with a shorter duration, Wait will preempt + // and duration will update to reflect the new alarm's timestamp + for (long duration = _timer.GetDuration(_schedule.First().TimeStamp); + duration > 0; + duration = _timer.GetDuration(_schedule.First().TimeStamp)) + { + Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration)); + } + + Time time = _schedule.First(); + _schedule.Remove(time); + return time; + } + + /// <summary> + /// Process the next Time event. + /// </summary> + /// <param name="time">The Time event to handle</param> + private void ProcessEvent(Time time) + { + if (time is Alarm) + { + Alarm alarm = (Alarm) time; + alarm.Handle(); + } + else + { + _handlers.OnNext(time); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs new file mode 100644 index 0000000..16d3d57 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; + +namespace Org.Apache.REEF.Wake.Time +{ + /// <summary> + /// Time object + /// </summary> + public abstract class Time : IComparable<Time> + { + public Time(long timeStamp) + { + TimeStamp = timeStamp; + } + + public long TimeStamp { get; private set; } + + public override string ToString() + { + return string.Format(CultureInfo.InvariantCulture, "{0}:[{1}]", GetType().Name, TimeStamp); + } + + public override int GetHashCode() + { + return base.GetHashCode(); + } + + public override bool Equals(object obj) + { + if (this == obj) + { + return true; + } + Time other = obj as Time; + if (other != null) + { + return CompareTo(other) == 0; + } + return false; + } + + public int CompareTo(Time other) + { + if (TimeStamp < other.TimeStamp) + { + return -1; + } + if (TimeStamp > other.TimeStamp) + { + return 1; + } + if (GetHashCode() < other.GetHashCode()) + { + return -1; + } + if (GetHashCode() > other.GetHashCode()) + { + return 1; + } + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs new file mode 100644 index 0000000..ff16af9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; + +namespace Org.Apache.REEF.Wake.Util +{ + public class Actionable + { + private readonly ThreadStart _threadStart; + + public Actionable() + { + } + + internal Actionable(ThreadStart threadStart) + { + _threadStart = threadStart; + } + + public void Call() + { + _threadStart(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs new file mode 100644 index 0000000..c289eea --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Util +{ + /// <summary> + /// Generates IDisposables from a factory method + /// </summary> + internal class Disposable : IDisposable + { + private Action _disposeFunction; + private bool _disposed; + + private Disposable(Action disposeFunction) + { + _disposeFunction = disposeFunction; + _disposed = false; + } + + /// <summary> + /// Factory method to create an IDisposable from a function. + /// </summary> + /// <param name="disposeFunction">The function to call when disposing</param> + /// <returns>An IDisposable from the given dispose function</returns> + public static IDisposable Create(Action disposeFunction) + { + return new Disposable(disposeFunction); + } + + /// <summary> + /// Dispose of resources by calling the supplied dispose function + /// </summary> + public void Dispose() + { + if (!_disposed) + { + _disposeFunction(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs b/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs new file mode 100644 index 0000000..49f9a11 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Wake.Util +{ + public class FixedThreadPoolTaskService : ITaskService + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(FixedThreadPoolTaskService)); + + TaskFactory factory; + + List<Task> tasks = new List<Task>(); + bool shuttingDown; + + internal FixedThreadPoolTaskService(int maxDegreeOfParallelism) + { + LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism); + factory = new TaskFactory(lcts); + } + + public bool AwaitTermination(long n, TimeSpan unit) + { + Task[] allTasks; + lock (tasks) + { + if (tasks.Count == 0) + { + return true; + } + allTasks = tasks.ToArray(); + } + return Task.WaitAll(allTasks, unit); + } + + public void ShutdownNow() + { + Shutdown(); + } + + public void Shutdown() + { + lock (tasks) + { + shuttingDown = true; + } + } + + public Task<T> Submit<T>(Func<T> c) + { + Task<T> task = null; + lock (tasks) + { + if (shuttingDown) + { + Exceptions.Throw(new InvalidOperationException("Shutting down"), LOGGER); + } + + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + CancellationToken cancellationToken = cancellationTokenSource.Token; + task = factory.StartNew(c, cancellationToken); + tasks.Add(task); + } + return task; + } + + public void Execute(ThreadStart threadStart) + { + new Actionable(threadStart).Call(); + } + + internal void RemoveTask(Task task) + { + lock (tasks) + { + tasks.Remove(task); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs new file mode 100644 index 0000000..81214a4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Util +{ + public interface IStartable + { + void Start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs b/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs new file mode 100644 index 0000000..ae9fabc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Util +{ + public interface ITaskService + { + void Shutdown(); + + void Execute(ThreadStart threadStart); + + Task<T> Submit<T>(Func<T> ob); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs b/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 0000000..f6fd482 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Util +{ + internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(LimitedConcurrencyLevelTaskScheduler)); + + /// <summary>Whether the current thread is processing work items.</summary> + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + + /// <summary>The list of tasks to be executed.</summary> + private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) + + /// <summary>The maximum concurrency level allowed by this scheduler.</summary> + private readonly int _maxDegreeOfParallelism; + + /// <summary>Whether the scheduler is currently processing work items.</summary> + private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) + + /// <summary> + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// </summary> + /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) + { + Exceptions.Throw(new ArgumentOutOfRangeException("maxDegreeOfParallelism"), LOGGER); + } + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> + public sealed override int MaximumConcurrencyLevel + { + get + { + return _maxDegreeOfParallelism; + } + } + + /// <summary>Queues a task to the scheduler.</summary> + /// <param name="task">The task to be queued.</param> + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) + { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) + { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// <summary>Attempts to execute the specified task on the current thread.</summary> + /// <param name="task">The task to be executed.</param> + /// <param name="taskWasPreviouslyQueued"></param> + /// <returns>Whether the task could be executed on the current thread.</returns> + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) + { + return false; + } + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) + { + TryDequeue(task); + } + + // Try to run the task. + return TryExecuteTask(task); + } + + /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> + /// <param name="task">The task to be removed.</param> + /// <returns>Whether the task could be found and removed.</returns> + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) + { + return _tasks.Remove(task); + } + } + + /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> + /// <returns>An enumerable of the tasks currently scheduled.</returns> + protected sealed override IEnumerable<Task> GetScheduledTasks() + { + bool lockTaken = false; + try + { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) + { + return _tasks.ToArray(); + } + else + { + throw new NotSupportedException(); + } + } + finally + { + if (lockTaken) + { + Monitor.Exit(_tasks); + } + } + } + + /// <summary> + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// </summary> + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + --_delegatesQueuedOrRunning; + break; + } + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + // Execute the task we pulled out of the queue + base.TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally + { + _currentThreadIsProcessingItems = false; + } + }, null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs b/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs new file mode 100644 index 0000000..bd35952 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Util +{ + public class NetworkUtils + { + private static IPAddress _localAddress; + private static Random _random = new Random(); + + /// <summary> + /// Returns the first usable IP Address for the machine. + /// </summary> + /// <returns>The machine's local IP Address</returns> + public static IPAddress LocalIPAddress + { + get + { + if (_localAddress == null) + { + IPAddress[] localIps = Dns.GetHostAddresses(Dns.GetHostName()); + _localAddress = localIps.Where(i => i.AddressFamily.Equals(AddressFamily.InterNetwork)) + .OrderBy(ip => ip.ToString()) + .First(); + } + + return _localAddress; + } + } + + /// <summary> + /// Generate a random port between low (inclusive) and high (exclusive) + /// </summary> + /// <param name="low">The inclusive lower bound of the of the port range</param> + /// <param name="high">The exclusive upper bound of the port range</param> + /// <returns>The randomly generated port</returns> + public static int GenerateRandomPort(int low, int high) + { + return _random.Next(low, high); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs b/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs new file mode 100644 index 0000000..1c29382 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using ProtoBuf; +using System; +using System.IO; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Wake.Util +{ + public class SerializationHelper + { + public static byte[] Serialize<T>(T t) + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, t); + return s.ToArray(); + } + } + + public static T Deserialize<T>(byte[] bytes) + { + using (var s = new MemoryStream(bytes)) + { + return Serializer.Deserialize<T>(s); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs b/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs new file mode 100644 index 0000000..69e4972 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Util +{ + public static class TaskExtensions + { + public static void Forget(this Task task) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs b/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs new file mode 100644 index 0000000..2f913cc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; + +namespace Org.Apache.REEF.Wake.Util +{ + public class TimeHelper + { + public const long TicksPerMilliSecond = 10000; + public const long TicksPerMicroSecond = 10; + public const double TicksPerNanoSecond = .01; + + public static long CurrentTimeToNanoSeconds + { + get + { + return DateTime.Now.Ticks / 100; + } + } + + public static long AsLongNanoSeconds(TimeSpan timeSpan) + { + return (long)(timeSpan.Ticks * TicksPerNanoSecond); + } + + public static double AsDoubleNanoSeconds(TimeSpan timeSpan) + { + return timeSpan.Ticks * TicksPerNanoSecond; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs b/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs new file mode 100644 index 0000000..2e21a60 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Wake +{ + /// <summary>Wake runtime exception</summary> + [System.Serializable] + public class WakeRuntimeException : Exception + { + private const long serialVersionUID = 1L; + + /// <summary>Constructs a new runtime wake exception with the specified detail message and cause + /// </summary> + /// <param name="s">the detailed message</param> + /// <param name="e">the cause</param> + public WakeRuntimeException(string s, Exception e) + : base(s, e) + { + } + + /// <summary>Constructs a new runtime stage exception with the specified detail message + /// </summary> + /// <param name="s">the detailed message</param> + public WakeRuntimeException(string s) + : base(s) + { + } + + /// <summary>Constructs a new runtime stage exception with the specified cause</summary> + /// <param name="e">the cause</param> + public WakeRuntimeException(Exception e) + : base("Runtime Exception", e) + { + } + } +}
