http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs new file mode 100644 index 0000000..2e99202 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Wake.Time.Runtime.Event; + +namespace Org.Apache.Reef.Wake.Time.Runtime +{ + public class RealTimer : ITimer + { + [Inject] + public RealTimer() + { + } + + /// <summary> + /// Gets the number of milliseconds since Epoch + /// </summary> + public long CurrentTime + { + get { return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond; } + } + + /// <summary> + /// Gets the difference between the given time and the current time + /// </summary> + /// <param name="time">The time to compare against the current time</param> + public long GetDuration(long time) + { + return time - CurrentTime; + } + + /// <summary> + /// Checks if the given time has already passed. + /// </summary> + /// <param name="time">The time to check if it has passed or not</param> + public bool IsReady(long time) + { + return GetDuration(time) <= 0; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs new file mode 100644 index 0000000..a8c77af --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Annotations; +using Org.Apache.Reef.Tang.Exceptions; +using Org.Apache.Reef.Tang.Implementations; +using Org.Apache.Reef.Wake.RX.Impl; + +namespace Org.Apache.Reef.Wake.Time.Runtime.Event +{ + public class RuntimeClock : IClock + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock)); + + private ITimer _timer; + private PubSubSubject<Time> _handlers; + private ISet<Time> _schedule; + + private IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler; + private IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler; + private IInjectionFuture<ISet<IObserver<RuntimeStart>>> _runtimeStartHandler; + private IInjectionFuture<ISet<IObserver<RuntimeStop>>> _runtimeStopHandler; + private IInjectionFuture<ISet<IObserver<IdleClock>>> _idleHandler; + + private bool _disposed; + + /// <summary> + /// Create a new RuntimeClock with injectable IObservers + /// </summary> + /// <param name="timer">The runtime clock timer</param> + /// <param name="startHandler">The start handler</param> + /// <param name="stopHandler">The stop handler</param> + /// <param name="runtimeStartHandler">The runtime start handler</param> + /// <param name="runtimeStopHandler">The runtime stop handler</param> + /// <param name="idleHandler">The idle handler</param> + [Inject] + internal RuntimeClock( + ITimer timer, + [Parameter(typeof(StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> startHandler, + [Parameter(typeof(StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> stopHandler, + [Parameter(typeof(RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> runtimeStartHandler, + [Parameter(typeof(RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> runtimeStopHandler, + [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler) + { + _timer = timer; + _schedule = new SortedSet<Time>(); + _handlers = new PubSubSubject<Time>(); + + _startHandler = startHandler; + _stopHandler = stopHandler; + _runtimeStartHandler = runtimeStartHandler; + _runtimeStopHandler = runtimeStopHandler; + _idleHandler = idleHandler; + } + + public IInjectionFuture<ISet<IObserver<RuntimeStart>>> InjectedRuntimeStartHandler + { + get { return _runtimeStartHandler; } + set { _runtimeStartHandler = value; } + } + + public IInjectionFuture<ISet<IObserver<RuntimeStop>>> InjectedRuntimeStopHandler + { + get { return _runtimeStopHandler; } + set { _runtimeStopHandler = value; } + } + + /// <summary> + /// Schedule a TimerEvent at the given future offset + /// </summary> + /// <param name="offset">The offset in the future to schedule the alarm</param> + /// <param name="handler">The IObserver to to be called</param> + public override void ScheduleAlarm(long offset, IObserver<Alarm> handler) + { + if (_disposed) + { + return; + } + if (handler == null) + { + Exceptions.Throw(new ArgumentNullException("handler"), LOGGER); + } + + lock (_schedule) + { + _schedule.Add(new ClientAlarm(_timer.CurrentTime + offset, handler)); + Monitor.PulseAll(_schedule); + } + } + + /// <summary> + /// Clock is idle if it has no future alarms set + /// </summary> + /// <returns>True if no future alarms are set, otherwise false</returns> + public override bool IsIdle() + { + lock (_schedule) + { + return _schedule.Count == 0; + } + } + + /// <summary> + /// Dispose of the clock and all scheduled alarms + /// </summary> + public override void Dispose() + { + lock (_schedule) + { + _schedule.Clear(); + _schedule.Add(new StopTime(_timer.CurrentTime)); + Monitor.PulseAll(_schedule); + _disposed = true; + } + } + + /// <summary> + /// Register the IObserver for the particular Time event. + /// </summary> + /// <param name="observer">The handler to register</param> + public void RegisterObserver<U>(IObserver<U> observer) where U : Time + { + if (_disposed) + { + return; + } + + _handlers.Subscribe(observer); + } + + /// <summary> + /// Start the RuntimeClock. + /// Clock will continue to run and handle events until it has been disposed. + /// </summary> + public void Run() + { + SubscribeHandlers(); + _handlers.OnNext(new RuntimeStart(_timer.CurrentTime)); + _handlers.OnNext(new StartTime(_timer.CurrentTime)); + + while (true) + { + lock (_schedule) + { + if (IsIdle()) + { + _handlers.OnNext(new IdleClock(_timer.CurrentTime)); + } + + // Blocks and releases lock until it receives the next event + Time alarm = GetNextEvent(); + ProcessEvent(alarm); + + if (alarm is StopTime) + { + break; + } + } + } + _handlers.OnNext(new RuntimeStop(_timer.CurrentTime)); + } + + /// <summary> + /// Register the event handlers + /// </summary> + private void SubscribeHandlers() + { + Subscribe(_startHandler.Get()); + Subscribe(_stopHandler.Get()); + Subscribe(_runtimeStartHandler.Get()); + Subscribe(_runtimeStopHandler.Get()); + Subscribe(_idleHandler.Get()); + } + + /// <summary> + /// Subscribe a set of IObservers for a particular Time event + /// </summary> + /// <param name="observers">The set of observers to subscribe</param> + private void Subscribe<U>(ISet<IObserver<U>> observers) where U : Time + { + foreach (IObserver<U> observer in observers) + { + _handlers.Subscribe(observer); + } + } + + /// <summary> + /// Wait until the first scheduled alarm is ready to be handled + /// Assumes that we have a lock on the _schedule SortedSet + /// </summary> + private Time GetNextEvent() + { + // Wait for an alarm to be scheduled on the condition variable Count + while (_schedule.Count == 0) + { + Monitor.Wait(_schedule); + } + + // Once the alarm is scheduled, wait for the prescribed amount of time. + // If a new alarm is scheduled with a shorter duration, Wait will preempt + // and duration will update to reflect the new alarm's timestamp + for (long duration = _timer.GetDuration(_schedule.First().TimeStamp); + duration > 0; + duration = _timer.GetDuration(_schedule.First().TimeStamp)) + { + Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration)); + } + + Time time = _schedule.First(); + _schedule.Remove(time); + return time; + } + + /// <summary> + /// Process the next Time event. + /// </summary> + /// <param name="time">The Time event to handle</param> + private void ProcessEvent(Time time) + { + if (time is Alarm) + { + Alarm alarm = (Alarm) time; + alarm.Handle(); + } + else + { + _handlers.OnNext(time); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Time.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Time/Time.cs b/lang/cs/Source/WAKE/Wake/Time/Time.cs new file mode 100644 index 0000000..0ebc1a7 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Time/Time.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; + +namespace Org.Apache.Reef.Wake.Time +{ + /// <summary> + /// Time object + /// </summary> + public abstract class Time : IComparable<Time> + { + public Time(long timeStamp) + { + TimeStamp = timeStamp; + } + + public long TimeStamp { get; private set; } + + public override string ToString() + { + return string.Format(CultureInfo.InvariantCulture, "{0}:[{1}]", GetType().Name, TimeStamp); + } + + public override int GetHashCode() + { + return base.GetHashCode(); + } + + public override bool Equals(object obj) + { + if (this == obj) + { + return true; + } + Time other = obj as Time; + if (other != null) + { + return CompareTo(other) == 0; + } + return false; + } + + public int CompareTo(Time other) + { + if (TimeStamp < other.TimeStamp) + { + return -1; + } + if (TimeStamp > other.TimeStamp) + { + return 1; + } + if (GetHashCode() < other.GetHashCode()) + { + return -1; + } + if (GetHashCode() > other.GetHashCode()) + { + return 1; + } + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/Actionable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/Actionable.cs b/lang/cs/Source/WAKE/Wake/Util/Actionable.cs new file mode 100644 index 0000000..5e5364c --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/Actionable.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; + +namespace Org.Apache.Reef.Wake.Util +{ + public class Actionable + { + private readonly ThreadStart _threadStart; + + public Actionable() + { + } + + internal Actionable(ThreadStart threadStart) + { + _threadStart = threadStart; + } + + public void Call() + { + _threadStart(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/Disposable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/Disposable.cs b/lang/cs/Source/WAKE/Wake/Util/Disposable.cs new file mode 100644 index 0000000..ebe30bc --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/Disposable.cs @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Util +{ + /// <summary> + /// Generates IDisposables from a factory method + /// </summary> + internal class Disposable : IDisposable + { + private Action _disposeFunction; + private bool _disposed; + + private Disposable(Action disposeFunction) + { + _disposeFunction = disposeFunction; + _disposed = false; + } + + /// <summary> + /// Factory method to create an IDisposable from a function. + /// </summary> + /// <param name="disposeFunction">The function to call when disposing</param> + /// <returns>An IDisposable from the given dispose function</returns> + public static IDisposable Create(Action disposeFunction) + { + return new Disposable(disposeFunction); + } + + /// <summary> + /// Dispose of resources by calling the supplied dispose function + /// </summary> + public void Dispose() + { + if (!_disposed) + { + _disposeFunction(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs b/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs new file mode 100644 index 0000000..e86820d --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Time.Runtime.Event; + +namespace Org.Apache.Reef.Wake.Util +{ + public class FixedThreadPoolTaskService : ITaskService + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(FixedThreadPoolTaskService)); + + TaskFactory factory; + + List<Task> tasks = new List<Task>(); + bool shuttingDown; + + internal FixedThreadPoolTaskService(int maxDegreeOfParallelism) + { + LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism); + factory = new TaskFactory(lcts); + } + + public bool AwaitTermination(long n, TimeSpan unit) + { + Task[] allTasks; + lock (tasks) + { + if (tasks.Count == 0) + { + return true; + } + allTasks = tasks.ToArray(); + } + return Task.WaitAll(allTasks, unit); + } + + public void ShutdownNow() + { + Shutdown(); + } + + public void Shutdown() + { + lock (tasks) + { + shuttingDown = true; + } + } + + public Task<T> Submit<T>(Func<T> c) + { + Task<T> task = null; + lock (tasks) + { + if (shuttingDown) + { + Exceptions.Throw(new InvalidOperationException("Shutting down"), LOGGER); + } + + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + CancellationToken cancellationToken = cancellationTokenSource.Token; + task = factory.StartNew(c, cancellationToken); + tasks.Add(task); + } + return task; + } + + public void Execute(ThreadStart threadStart) + { + new Actionable(threadStart).Call(); + } + + internal void RemoveTask(Task task) + { + lock (tasks) + { + tasks.Remove(task); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/IStartable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/IStartable.cs b/lang/cs/Source/WAKE/Wake/Util/IStartable.cs new file mode 100644 index 0000000..2327819 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/IStartable.cs @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Util +{ + public interface IStartable + { + void Start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs b/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs new file mode 100644 index 0000000..2ad0e1e --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Util +{ + public interface ITaskService + { + void Shutdown(); + + void Execute(ThreadStart threadStart); + + Task<T> Submit<T>(Func<T> ob); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs b/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 0000000..4bd3083 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.Reef.Utilities.Diagnostics; +using Org.Apache.Reef.Utilities.Logging; + +namespace Org.Apache.Reef.Wake.Util +{ + internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(LimitedConcurrencyLevelTaskScheduler)); + + /// <summary>Whether the current thread is processing work items.</summary> + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + + /// <summary>The list of tasks to be executed.</summary> + private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) + + /// <summary>The maximum concurrency level allowed by this scheduler.</summary> + private readonly int _maxDegreeOfParallelism; + + /// <summary>Whether the scheduler is currently processing work items.</summary> + private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) + + /// <summary> + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// </summary> + /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) + { + Exceptions.Throw(new ArgumentOutOfRangeException("maxDegreeOfParallelism"), LOGGER); + } + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> + public sealed override int MaximumConcurrencyLevel + { + get + { + return _maxDegreeOfParallelism; + } + } + + /// <summary>Queues a task to the scheduler.</summary> + /// <param name="task">The task to be queued.</param> + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) + { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) + { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// <summary>Attempts to execute the specified task on the current thread.</summary> + /// <param name="task">The task to be executed.</param> + /// <param name="taskWasPreviouslyQueued"></param> + /// <returns>Whether the task could be executed on the current thread.</returns> + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) + { + return false; + } + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) + { + TryDequeue(task); + } + + // Try to run the task. + return TryExecuteTask(task); + } + + /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> + /// <param name="task">The task to be removed.</param> + /// <returns>Whether the task could be found and removed.</returns> + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) + { + return _tasks.Remove(task); + } + } + + /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> + /// <returns>An enumerable of the tasks currently scheduled.</returns> + protected sealed override IEnumerable<Task> GetScheduledTasks() + { + bool lockTaken = false; + try + { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) + { + return _tasks.ToArray(); + } + else + { + throw new NotSupportedException(); + } + } + finally + { + if (lockTaken) + { + Monitor.Exit(_tasks); + } + } + } + + /// <summary> + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// </summary> + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + --_delegatesQueuedOrRunning; + break; + } + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + // Execute the task we pulled out of the queue + base.TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally + { + _currentThreadIsProcessingItems = false; + } + }, null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs b/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs new file mode 100644 index 0000000..c598c49 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Util +{ + public class NetworkUtils + { + private static IPAddress _localAddress; + private static Random _random = new Random(); + + /// <summary> + /// Returns the first usable IP Address for the machine. + /// </summary> + /// <returns>The machine's local IP Address</returns> + public static IPAddress LocalIPAddress + { + get + { + if (_localAddress == null) + { + IPAddress[] localIps = Dns.GetHostAddresses(Dns.GetHostName()); + _localAddress = localIps.Where(i => i.AddressFamily.Equals(AddressFamily.InterNetwork)) + .OrderBy(ip => ip.ToString()) + .First(); + } + + return _localAddress; + } + } + + /// <summary> + /// Generate a random port between low (inclusive) and high (exclusive) + /// </summary> + /// <param name="low">The inclusive lower bound of the of the port range</param> + /// <param name="high">The exclusive upper bound of the port range</param> + /// <returns>The randomly generated port</returns> + public static int GenerateRandomPort(int low, int high) + { + return _random.Next(low, high); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs b/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs new file mode 100644 index 0000000..cd9e220 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using ProtoBuf; +using System; +using System.IO; +using System.Runtime.Serialization; + +namespace Org.Apache.Reef.Wake.Util +{ + public class SerializationHelper + { + public static byte[] Serialize<T>(T t) + { + using (var s = new MemoryStream()) + { + Serializer.Serialize(s, t); + return s.ToArray(); + } + } + + public static T Deserialize<T>(byte[] bytes) + { + using (var s = new MemoryStream(bytes)) + { + return Serializer.Deserialize<T>(s); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs b/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs new file mode 100644 index 0000000..0c1da40 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.Reef.Wake.Util +{ + public static class TaskExtensions + { + public static void Forget(this Task task) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs b/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs new file mode 100644 index 0000000..2b437fa --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; + +namespace Org.Apache.Reef.Wake.Util +{ + public class TimeHelper + { + public const long TicksPerMilliSecond = 10000; + public const long TicksPerMicroSecond = 10; + public const double TicksPerNanoSecond = .01; + + public static long CurrentTimeToNanoSeconds + { + get + { + return DateTime.Now.Ticks / 100; + } + } + + public static long AsLongNanoSeconds(TimeSpan timeSpan) + { + return (long)(timeSpan.Ticks * TicksPerNanoSecond); + } + + public static double AsDoubleNanoSeconds(TimeSpan timeSpan) + { + return timeSpan.Ticks * TicksPerNanoSecond; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Wake.csproj b/lang/cs/Source/WAKE/Wake/Wake.csproj new file mode 100644 index 0000000..689df3c --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/Wake.csproj @@ -0,0 +1,214 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{CDFB3464-4041-42B1-9271-83AF24CD5008}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.Reef.Wake</RootNamespace> + <AssemblyName>Org.Apache.Reef.Wake</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir> + <RestorePackages>true</RestorePackages> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>..\..\..\bin\Debug\Org.Apache.Reef.Wake\</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>..\..\..\bin\Release\Microsoft.Wake\</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="protobuf-net"> + <HintPath>..\..\..\packages\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Reactive.Core"> + <HintPath>..\..\..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Interfaces"> + <HintPath>..\..\..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="AbstractEStage.cs" /> + <Compile Include="IEStage.cs" /> + <Compile Include="IEventHandler.cs" /> + <Compile Include="IIdentifier.cs" /> + <Compile Include="IIdentifierFactory.cs" /> + <Compile Include="Impl\LoggingEventHandler.cs" /> + <Compile Include="Impl\MissingStartHandlerHandler.cs" /> + <Compile Include="Impl\MultiEventHandler.cs" /> + <Compile Include="Impl\PeriodicEvent.cs" /> + <Compile Include="Impl\PubSubEventHandler.cs" /> + <Compile Include="Impl\SingleThreadStage.cs" /> + <Compile Include="Impl\SyncStage.cs" /> + <Compile Include="Impl\ThreadPoolStage.cs" /> + <Compile Include="Impl\TimerStage.cs" /> + <Compile Include="IObserverFactory.cs" /> + <Compile Include="IStage.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Protobuf\WakeRemoteProtosGen.cs" /> + <Compile Include="Remote\ICodec.cs" /> + <Compile Include="Remote\ICodecFactory.cs" /> + <Compile Include="Remote\IDecoder.cs" /> + <Compile Include="Remote\IEncoder.cs" /> + <Compile Include="Remote\ILink.cs" /> + <Compile Include="Remote\Impl\ByteCodec.cs" /> + <Compile Include="Remote\Impl\ByteCodecFactory.cs" /> + <Compile Include="Remote\Impl\Channel.cs" /> + <Compile Include="Remote\Impl\DefaultRemoteManager.cs" /> + <Compile Include="Remote\Impl\DefaultRemoteMessage.cs" /> + <Compile Include="Remote\Impl\IntCodec.cs" /> + <Compile Include="Remote\Impl\IPEndpointComparer.cs" /> + <Compile Include="Remote\Impl\Link.cs" /> + <Compile Include="Remote\Impl\MultiCodec.cs" /> + <Compile Include="Remote\Impl\MultiDecoder.cs" /> + <Compile Include="Remote\Impl\MultiEncoder.cs" /> + <Compile Include="Remote\Impl\ObserverContainer.cs" /> + <Compile Include="Remote\Impl\RemoteEvent.cs" /> + <Compile Include="Remote\Impl\RemoteEventCodec.cs" /> + <Compile Include="Remote\Impl\RemoteEventDecoder.cs" /> + <Compile Include="Remote\Impl\RemoteEventEncoder.cs" /> + <Compile Include="Remote\Impl\RemoteEventEndpoint.cs" /> + <Compile Include="Remote\Impl\SocketRemoteIdentifier.cs" /> + <Compile Include="Remote\Impl\StringCodec.cs" /> + <Compile Include="Remote\Impl\StringIdentifier.cs" /> + <Compile Include="Remote\Impl\StringIdentifierFactory.cs" /> + <Compile Include="Remote\Impl\TransportClient.cs" /> + <Compile Include="Remote\Impl\TransportEvent.cs" /> + <Compile Include="Remote\Impl\TransportServer.cs" /> + <Compile Include="Remote\IRemoteEvent.cs" /> + <Compile Include="Remote\IRemoteIdentifier.cs" /> + <Compile Include="Remote\IRemoteIdentifierFactory.cs" /> + <Compile Include="Remote\IRemoteManager.cs" /> + <Compile Include="Remote\IRemoteMessage.cs" /> + <Compile Include="Remote\ISubscriptionManager.cs" /> + <Compile Include="Remote\Proto\WakeRemoteProtos.cs" /> + <Compile Include="Remote\RemoteConfiguration.cs" /> + <Compile Include="Remote\RemoteRuntimeException.cs" /> + <Compile Include="RX\AbstractObserver.cs" /> + <Compile Include="RX\AbstractRxStage.cs" /> + <Compile Include="RX\Impl\PubSubSubject.cs" /> + <Compile Include="RX\Impl\RxSyncStage.cs" /> + <Compile Include="RX\Impl\RxThreadPoolStage.cs" /> + <Compile Include="RX\Impl\RxTimerStage.cs" /> + <Compile Include="RX\Impl\SimpleSubject.cs" /> + <Compile Include="RX\IRxStage.cs" /> + <Compile Include="RX\IStaticObservable.cs" /> + <Compile Include="RX\ISubject.cs" /> + <Compile Include="RX\ObserverCompletedException.cs" /> + <Compile Include="src\main\cs\Examples\P2p\IEventSource.cs" /> + <Compile Include="src\main\cs\Examples\P2p\Pull2Push.cs" /> + <Compile Include="src\main\cs\PeriodicEvent.cs" /> + <Compile Include="Time\Event\Alarm.cs" /> + <Compile Include="Time\Event\StartTime.cs" /> + <Compile Include="Time\Event\StopTime.cs" /> + <Compile Include="Time\IClock.cs" /> + <Compile Include="Time\Runtime\Event\ClientAlarm.cs" /> + <Compile Include="Time\Runtime\Event\IdleClock.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeAlarm.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeStart.cs" /> + <Compile Include="Time\Runtime\Event\RuntimeStop.cs" /> + <Compile Include="Time\Runtime\ITimer.cs" /> + <Compile Include="Time\Runtime\LogicalTimer.cs" /> + <Compile Include="Time\Runtime\RealTimer.cs" /> + <Compile Include="Time\Runtime\RuntimeClock.cs" /> + <Compile Include="Time\Time.cs" /> + <Compile Include="Util\Actionable.cs" /> + <Compile Include="Util\Disposable.cs" /> + <Compile Include="Util\FixedThreadPoolTaskService.cs" /> + <Compile Include="Util\IStartable.cs" /> + <Compile Include="Util\ITaskService.cs" /> + <Compile Include="Util\LimitedConcurrencyLevelTaskScheduler.cs" /> + <Compile Include="Util\NetworkUtils.cs" /> + <Compile Include="Util\SerializationHelper.cs" /> + <Compile Include="Util\TaskExtensions.cs" /> + <Compile Include="Util\TimeHelper.cs" /> + <Compile Include="WakeRuntimeException.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + <None Include="Protobuf\RemoteProtocol.proto" /> + <None Include="testkey.snk" /> + </ItemGroup> + <ItemGroup> + <Folder Include="Impl\Impl\" /> + <Folder Include="RX\RX\Impl\" /> + <Folder Include="Time\Time\Event\" /> + <Folder Include="Time\Time\Runtime\Event\" /> + <Folder Include="Util\Util\" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\..\Tang\Tang\Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Tang</Name> + </ProjectReference> + <ProjectReference Include="..\..\Utilities\Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Utilities</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <PropertyGroup> + <MainProtoBufDir>.\target\generated-sources\proto\main\cs\Wake</MainProtoBufDir> + <TestProtoBufDir>.\target\generated-sources\proto\test\cs\Wake</TestProtoBufDir> + <TestRemoteDir>..\..\wake\target\generated-sources\proto\test\cs\Wake</TestRemoteDir> + </PropertyGroup> + <Target Name="ProtoBuf"> + <MakeDir Directories="$(MainProtoBufDir)" Condition="!Exists('$(MainProtoBufDir)')" /> + <MakeDir Directories="$(TestProtoBufDir)" Condition="!Exists('$(TestProtoBufDir)')" /> + <MakeDir Directories="$(TestRemoteDir)" Condition="!Exists('$(TestRemoteDir)')" /> + <Exec Command="protogen -i:.\src\main\proto\RemoteProtocol.proto -o:$(MainProtoBufDir)\WakeRemoteProtosGen.cs -ns:Wake.Remote.Proto.WakeRemoteProtos" /> + <!--<Exec Command="protogen -i:.\src\test\proto\TestProtocol.proto -o:$(TestProtoBufDir)\TestProtosGen.cs -ns:Wake.Test.Proto.TestProtos" /> + <Exec Command="protogen -i:.\src\test\proto\TestEvent1.proto -o:$(TestRemoteDir)\TestEvent1.pb.cs -ns:Wake.Test.Remote.TestRemote" />--> + </Target> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs b/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs new file mode 100644 index 0000000..0ac2c13 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.Reef.Wake +{ + /// <summary>Wake runtime exception</summary> + [System.Serializable] + public class WakeRuntimeException : Exception + { + private const long serialVersionUID = 1L; + + /// <summary>Constructs a new runtime wake exception with the specified detail message and cause + /// </summary> + /// <param name="s">the detailed message</param> + /// <param name="e">the cause</param> + public WakeRuntimeException(string s, Exception e) + : base(s, e) + { + } + + /// <summary>Constructs a new runtime stage exception with the specified detail message + /// </summary> + /// <param name="s">the detailed message</param> + public WakeRuntimeException(string s) + : base(s) + { + } + + /// <summary>Constructs a new runtime stage exception with the specified cause</summary> + /// <param name="e">the cause</param> + public WakeRuntimeException(Exception e) + : base("Runtime Exception", e) + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/packages.config b/lang/cs/Source/WAKE/Wake/packages.config new file mode 100644 index 0000000..fd78097 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/packages.config @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> + <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" /> + <package id="Rx-Core" version="2.2.5" targetFramework="net45" /> + <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" /> +</packages> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs new file mode 100644 index 0000000..d761032 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Wake.Examples.P2p +{ + /// <summary> + /// The pull side of the interface: Clients implement this and register it with + /// the PullToPush class. + /// </summary> + /// <typeparam name="T">The event type</typeparam> + public interface IEventSource<T> + { + /// <summary> + /// Gets the next event + /// </summary> + /// <returns>The next event</returns> + T GetNext(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs new file mode 100644 index 0000000..3f596f2 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake; +using Org.Apache.Reef.Wake.Util; +using System; +using System.Collections.Generic; + +namespace Wake.Examples.P2p +{ + /// <summary>Performs a Pull-to-Push conversion in Wake.</summary> + /// <remarks> + /// Performs a Pull-to-Push conversion in Wake. + /// The class pulls from a set of event sources, and pushes to a single + /// EventHandler. If the downstream event handler blocks, this will block, + /// providing a simple rate limiting scheme. + /// The EventSources are managed in a basic Queue. + /// </remarks> + public sealed class Pull2Push<T> : IStartable, IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(Pull2Push<T>)); + + private readonly IEventHandler<T> _output; + + private readonly Queue<IEventSource<T>> _sources = new Queue<IEventSource<T>>(); + + private bool _closed = false; + + /// <summary> + /// Constructs a new Pull2Push object + /// </summary> + /// <param name="output"> + /// the EventHandler that receives the messages from this + /// Pull2Push. + /// </param> + public Pull2Push(IEventHandler<T> output) + { + // The downstream EventHandler + // The upstream event sources + _output = output; + } + + /// <summary>Registers an event source.</summary> + /// <param name="source"> + /// The source that will be added to the queue of this + /// Pull2Push + /// </param> + public void Register(IEventSource<T> source) + { + _sources.Enqueue(source); + } + + /// <summary>Executes the message loop.</summary> + public void Start() + { + while (!_closed) + { + // Grab the next available message source, if any + IEventSource<T> nextSource = _sources.Dequeue(); + if (null != nextSource) + { + // Grab the next message from that source, if any + T message = nextSource.GetNext(); + if (null != message) + { + // Add the source to the end of the queue again. + _sources.Enqueue(nextSource); + // Send the message. Note that this may block depending on the underlying EventHandler. + _output.OnNext(message); + } + else + { + // The message source has returned null as the next message. We drop the message source in that case. + LOGGER.Log(Level.Info, "Droping message source {0} from the queue " + nextSource.ToString()); + } + } + } + } + + // No source where available. We could put a wait() here. + public void Dispose() + { + _closed = true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs new file mode 100644 index 0000000..a91e298 --- /dev/null +++ b/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs @@ -0,0 +1,23 @@ +/** + * Copyright 2013 Microsoft. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Wake.Impl +{ + /// <summary>Periodic event for timers</summary> + public class PeriodicEvent + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/testkey.snk ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/testkey.snk b/lang/cs/Source/WAKE/Wake/testkey.snk new file mode 100644 index 0000000..133423f Binary files /dev/null and b/lang/cs/Source/WAKE/Wake/testkey.snk differ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf b/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf new file mode 100644 index 0000000..67256f5 Binary files /dev/null and b/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf differ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs new file mode 100644 index 0000000..69fc9ae --- /dev/null +++ b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Org.Apache.Reef.Test +{ + [TestClass] + public class EvaluatorConfigurationsTests + { + [TestMethod, Priority(0), TestCategory("Unit")] + [DeploymentItem(@"ConfigFiles")] + public void TestEvaluatorConfigurations() + { + EvaluatorConfigurations evaluatorConfigurations = new EvaluatorConfigurations("evaluator.conf"); + + Assert.IsTrue(evaluatorConfigurations.EvaluatorId.Equals("Node-1-1414443998204")); + + Assert.IsTrue(evaluatorConfigurations.ApplicationId.Equals("REEF_LOCAL_RUNTIME")); + + string rootContextConfigString = evaluatorConfigurations.RootContextConfiguration; + Assert.IsFalse(string.IsNullOrWhiteSpace(rootContextConfigString)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs new file mode 100644 index 0000000..2c3eaa2 --- /dev/null +++ b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Avro; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Implementations; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Util; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.IO; + +namespace Org.Apache.Reef.Test +{ + [TestClass] + public class EvaluatorTests + { + [TestMethod, Priority(0), TestCategory("Functional")] + [Description("Parse Evaluator configuration from Java, inject and execute Shell task with DIR command based on the configuration")] + [DeploymentItem(@"ConfigFiles")] + public void CanInjectAndExecuteTask() + { + //To enforce that shell task dll be copied to output directory. + ShellTask tmpTask = new ShellTask("invalid"); + Assert.IsNotNull(tmpTask); + + string tmp = Directory.GetCurrentDirectory(); + Assert.IsNotNull(tmp); + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + AvroConfiguration avroConfiguration = serializer.AvroDeseriaizeFromFile("evaluator.conf"); + Assert.IsNotNull(avroConfiguration); + + ICsConfigurationBuilder cb = TangFactory.GetTang().NewConfigurationBuilder(); + cb.AddConfiguration(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "Test_CLRContext_task") + .Set(TaskConfiguration.Task, GenericType<ShellTask>.Class) + .Build()); + cb.BindNamedParameter<ShellTask.Command, string>(GenericType<ShellTask.Command>.Class, "dir"); + + IConfiguration taskConfiguration = cb.Build(); + + string taskConfig = serializer.ToString(taskConfiguration); + + ITask task = null; + TaskConfiguration config = new TaskConfiguration(taskConfig); + Assert.IsNotNull(config); + try + { + IInjector injector = TangFactory.GetTang().NewInjector(config.TangConfig); + task = (ITask)injector.GetInstance(typeof(ITask)); + } + catch (Exception e) + { + throw new InvalidOperationException("unable to inject task with configuration: " + taskConfig, e); + } + + byte[] bytes = task.Call(null); + string result = System.Text.Encoding.Default.GetString(bytes); + + //a dir command is executed in the container directory, which includes the file "evaluator.conf" + Assert.IsTrue(result.Contains("evaluator.conf")); + } + + [TestMethod, Priority(0), TestCategory("Unit")] + [Description("Test driver information extacted from Http server")] + public void CanExtractDriverInformaiton() + { + const string InfoString = "{\"remoteId\":\"socket://10.121.136.231:14272\",\"startTime\":\"2014 08 28 10:50:32\",\"services\":[{\"serviceName\":\"NameServer\",\"serviceInfo\":\"10.121.136.231:16663\"}]}"; + AvroDriverInfo info = AvroJsonSerializer<AvroDriverInfo>.FromString(InfoString); + Assert.IsTrue(info.remoteId.Equals("socket://10.121.136.231:14272")); + Assert.IsTrue(info.startTime.Equals("2014 08 28 10:50:32")); + Assert.IsTrue(new DriverInformation(info.remoteId, info.startTime, info.services).NameServerId.Equals("10.121.136.231:16663")); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs new file mode 100644 index 0000000..f0785f9 --- /dev/null +++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Driver; +using Org.Apache.Reef.Utilities.Logging; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; + +namespace Org.Apache.Reef.Test +{ + [TestClass] + public class TestBridgeClient : ReefFunctionalTest + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TestBridgeClient)); + + [TestInitialize()] + public void TestSetup() + { + CleanUp(); + Init(); + } + + [TestCleanup] + public void TestCleanup() + { + Console.WriteLine("Post test check and clean up"); + CleanUp(); + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Run CLR Bridge on local runtime")] + [DeploymentItem(@".")] + [Ignore] // This is diabled by default on builds + public void CanRunClrBridgeOnYarn() + { + RunClrBridgeClient(runOnYarn: true); + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Run CLR Bridge on local runtime")] + [DeploymentItem(@".")] + [Timeout(180 * 1000)] + public void CanRunClrBridgeOnLocalRuntime() + { + IsOnLocalRuntiime = true; + RunClrBridgeClient(runOnYarn: false); + ValidateSuccessForLocalRuntime(2); + } + + private void RunClrBridgeClient(bool runOnYarn) + { + const string clrBridgeClient = "Org.Apache.Reef.CLRBridgeClient.exe"; + List<string> arguments = new List<string>(); + arguments.Add(runOnYarn.ToString()); + arguments.Add(Constants.BridgeLaunchClass); + arguments.Add("."); + arguments.Add(Path.Combine(_binFolder, Constants.BridgeJarFileName)); + arguments.Add(Path.Combine(_binFolder, _cmdFile)); + + ProcessStartInfo startInfo = new ProcessStartInfo() + { + FileName = clrBridgeClient, + Arguments = string.Join(" ", arguments), + RedirectStandardOutput = true, + UseShellExecute = false, + CreateNoWindow = false + }; + + LOGGER.Log(Level.Info, "executing\r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments); + using (Process process = Process.Start(startInfo)) + { + process.WaitForExit(); + if (process.ExitCode != 0) + { + throw new InvalidOperationException("CLR client exited with error code " + process.ExitCode); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs new file mode 100644 index 0000000..7e896ec --- /dev/null +++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Org.Apache.Reef.Driver.Bridge; +using Org.Apache.Reef.Driver.Defaults; +using Org.Apache.Reef.Examples.HelloCLRBridge; +using Org.Apache.Reef.Examples.HelloCLRBridge.Handlers; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Util; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Org.Apache.Reef.Test +{ + [TestClass] + public class TestHelloBridgeHandlers : ReefFunctionalTest + { + [TestInitialize()] + public void TestSetup() + { + CleanUp(); + Init(); + } + + [TestCleanup] + public void TestCleanup() + { + Console.WriteLine("Post test check and clean up"); + CleanUp(); + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test Hello Handler on local runtime")] + [DeploymentItem(@".")] + [Timeout(180 * 1000)] + public void RunHelloHandlerOnLocalRuntime() + { + IsOnLocalRuntiime = true; + TestRun(AssembliesToCopy(), DriverConfiguration()); + ValidateSuccessForLocalRuntime(2); + ValidateEvaluatorSetting(); + } + + public IConfiguration DriverConfiguration() + { + return DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<HelloStartHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<HelloAllocatedEvaluatorHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<AnotherHelloAllocatedEvaluatorHandler>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<HelloActiveContextHandler>.Class) + .Set(DriverBridgeConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<HelloFailedEvaluatorHandler>.Class) + .Set(DriverBridgeConfiguration.OnTaskFailed, GenericType<HelloFailedTaskHandler>.Class) + .Set(DriverBridgeConfiguration.OnTaskRunning, GenericType<HelloRunningTaskHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<HelloEvaluatorRequestorHandler>.Class) + .Set(DriverBridgeConfiguration.OnHttpEvent, GenericType<HelloHttpHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorCompleted, GenericType<HelloCompletedEvaluatorHandler>.Class) + .Set(DriverBridgeConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Set(DriverBridgeConfiguration.CommandLineArguments, "submitContextAndTask") + .Build(); + } + + public HashSet<string> AssembliesToCopy() + { + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(HelloStartHandler).Assembly.GetName().Name); + appDlls.Add(typeof(HelloTask).Assembly.GetName().Name); + return appDlls; + } + + private void ValidateEvaluatorSetting() + { + const string successIndication = "Evaluator is assigned with 512 MB of memory and 2 cores."; + string[] lines = File.ReadAllLines(GetLogFile(_stdout)); + string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray(); + Assert.IsTrue(successIndicators.Count() >= 1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs new file mode 100644 index 0000000..fb8a011 --- /dev/null +++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Driver.Bridge; +using Org.Apache.Reef.Driver.Defaults; +using Org.Apache.Reef.Examples.HelloCLRBridge; +using Org.Apache.Reef.Examples.HelloCLRBridge.handlers; +using Org.Apache.Reef.Examples.HelloCLRBridge.Handlers; +using Org.Apache.Reef.IO.Network.Naming; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Interface; +using Org.Apache.Reef.Tang.Util; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Org.Apache.Reef.Test +{ + [TestClass] + public class TestSimpleEventHandlers : ReefFunctionalTest + { + [TestInitialize()] + public void TestSetup() + { + CleanUp(); + Init(); + } + + [TestCleanup] + public void TestCleanup() + { + Console.WriteLine("Post test check and clean up"); + CleanUp(); + } + + //[TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test Hello Handler on local runtime")] + [DeploymentItem(@".")] + [Timeout(180 * 1000)] + public void RunSimpleEventHandlerOnLocalRuntime() + { + IsOnLocalRuntiime = true; + TestRun(AssembliesToCopy(), DriverConfiguration()); + ValidateSuccessForLocalRuntime(2); + ValidateEvaluatorSetting(); + } + + public IConfiguration DriverConfiguration() + { + return DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<AnotherHelloAllocatedEvaluatorHandler>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnTaskCompleted, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnTaskFailed, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnTaskRunning, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnHttpEvent, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorCompleted, GenericType<HelloSimpleEventHandlers>.Class) + .Set(DriverBridgeConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Set(DriverBridgeConfiguration.CommandLineArguments, "submitContextAndTask") + .Set(DriverBridgeConfiguration.OnDriverRestarted, GenericType<HelloRestartHandler>.Class) + .Set(DriverBridgeConfiguration.OnDriverReconnect, GenericType<DefaultLocalHttpDriverConnection>.Class) + .Set(DriverBridgeConfiguration.OnDirverRestartContextActive, GenericType<HelloDriverRestartActiveContextHandler>.Class) + .Set(DriverBridgeConfiguration.OnDriverRestartTaskRunning, GenericType<HelloDriverRestartRunningTaskHandler>.Class) + .Build(); + } + + public HashSet<string> AssembliesToCopy() + { + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(HelloSimpleEventHandlers).Assembly.GetName().Name); + appDlls.Add(typeof(HelloTask).Assembly.GetName().Name); + appDlls.Add(typeof(INameServer).Assembly.GetName().Name); + return appDlls; + } + + private void ValidateEvaluatorSetting() + { + const string successIndication = "Evaluator is assigned with 512 MB of memory and 2 cores."; + string[] lines = File.ReadAllLines(GetLogFile(_stdout)); + string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray(); + Assert.IsTrue(successIndicators.Count() >= 1); + } + } +} \ No newline at end of file
