Repository: reef Updated Branches: refs/heads/master 51cc2cb3e -> d5679922d
[REEF-1133] Fix the implementation of `Time.Compare` in Wake This addressed the issue by * Implementing GetHashCode and CompareTo only based on long TimeStamp. * Adding comment in Equals regarding comparing subclasses of Time. * Adding a PriorityQueue implementation. * Change RuntimeClock to use PriorityQueue instead of SortedSet. * Adding tests for PriorityQueue and Time. JIRA: [REEF-1133](https://issues.apache.org/jira/browse/REEF-1133) Pull Request: This closes #774 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d5679922 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d5679922 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d5679922 Branch: refs/heads/master Commit: d5679922df0c8452083138f947ecfe4e36ea47c5 Parents: 51cc2cb Author: Andrew Chung <[email protected]> Authored: Fri Jan 15 13:52:26 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Thu Jan 21 10:12:18 2016 -0800 ---------------------------------------------------------------------- .../Org.Apache.REEF.Tests.csproj | 1 + .../Utility/TestPriorityQueue.cs | 218 ++++++++++++++++++ .../Collections/PriorityQueue.cs | 225 +++++++++++++++++++ .../Org.Apache.Reef.Utilities.csproj | 1 + .../Org.Apache.REEF.Wake.Tests.csproj | 3 +- lang/cs/Org.Apache.REEF.Wake.Tests/TimeTest.cs | 69 ++++++ .../Time/Runtime/RuntimeClock.cs | 9 +- lang/cs/Org.Apache.REEF.Wake/Time/Time.cs | 31 +-- 8 files changed, 532 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 5bcb19b..4e6f93b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -112,6 +112,7 @@ under the License. <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utility\TestDriverConfigGenerator.cs" /> <Compile Include="Utility\TestExceptions.cs" /> + <Compile Include="Utility\TestPriorityQueue.cs" /> </ItemGroup> <ItemGroup> <None Include="$(SolutionDir)\Org.Apache.REEF.Client\run.cmd"> http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Tests/Utility/TestPriorityQueue.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Utility/TestPriorityQueue.cs b/lang/cs/Org.Apache.REEF.Tests/Utility/TestPriorityQueue.cs new file mode 100644 index 0000000..36509f8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Utility/TestPriorityQueue.cs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Utilities.Collections; +using Xunit; +using Assert = Xunit.Assert; + +namespace Org.Apache.REEF.Tests.Utility +{ + public class TestPriorityQueue + { + private const int AnyInt = 2; + + [Fact] + public void PriorityQueueDequeueEmptyQueueThrowsException() + { + var q = new PriorityQueue<int>(); + Assert.Equal(0, q.Count); + Assert.Throws<InvalidOperationException>(() => q.Dequeue()); + } + + [Fact] + public void PriorityQueuePeekEmptyQueueThrowsException() + { + var q = new PriorityQueue<int>(); + Assert.Equal(0, q.Count); + Assert.Throws<InvalidOperationException>(() => q.Peek()); + } + + [Fact] + public void PriorityQueueEnqueueOneElementIncrementsCount() + { + var q = new PriorityQueue<int>(); + var count = q.Count; + q.Enqueue(AnyInt); + Assert.Equal(count + 1, q.Count); + } + + [Fact] + public void PriorityQueueDequeueOneElementDecrementsCount() + { + var q = new PriorityQueue<int>(); + q.Enqueue(AnyInt); + var count = q.Count; + q.Dequeue(); + Assert.Equal(count - 1, q.Count); + } + + [Fact] + public void PriorityQueuePeekQueueNoChangeInCount() + { + var q = new PriorityQueue<int>(); + + q.Enqueue(AnyInt); + var count = q.Count; + q.Peek(); + Assert.Equal(count, q.Count); + } + + [Fact] + public void PriorityQueueClearEmptiesTheQueue() + { + var q = new PriorityQueue<int>(); + q.Enqueue(AnyInt); + q.Enqueue(AnyInt); + q.Clear(); + + Assert.Equal(0, q.Count); + + Assert.Throws<InvalidOperationException>(() => q.Peek()); + } + + [Fact] + public void PriorityQueueAddElementsDequeueReturnsThemSortedByPriority() + { + var q = new PriorityQueue<int>(); + q.Enqueue(0); + q.Enqueue(4); + q.Enqueue(2); + q.Enqueue(3); + q.Enqueue(1); + + for (var i = 0; i < 5; i++) + { + var dequeued = q.Dequeue(); + Assert.Equal(dequeued, i); + } + } + + [Fact] + public void PriorityQueueAddElementsPeekReturnsHighestPriorityMessage() + { + var q = new PriorityQueue<int>(); + + q.Enqueue(0); + q.Enqueue(4); + q.Enqueue(2); + q.Enqueue(3); + q.Enqueue(1); + + var dequeued = q.Peek(); + + Assert.Equal(dequeued, 0); + } + + [Fact] + public void PriorityQueueAddSameElements() + { + const int testSize = 100; + + var q = new PriorityQueue<int>(); + for (var i = 0; i < testSize; i++) + { + var count = q.Count; + q.Enqueue(AnyInt); + Assert.Equal(count + 1, q.Count); + } + + Assert.Equal(testSize, q.Count); + + for (var i = 0; i < testSize; i++) + { + var count = q.Count; + Assert.Equal(AnyInt, q.Dequeue()); + Assert.Equal(count - 1, q.Count); + } + + Assert.Equal(0, q.Count); + } + + [Fact] + public void PriorityQueueRandomAddElementsDequeueReturnsInPriorityOrder() + { + const int testSize = 500; + const int testTimes = 5; + var r = new Random(AnyInt); + + for (var t = 0; t < testTimes; t++) + { + var q = new PriorityQueue<int>(); + var testArr = new int[testSize]; + for (var i = 0; i < testSize; i++) + { + var num = r.Next(); + testArr[i] = num; + q.Add(num); + } + + Array.Sort(testArr); + for (var i = 0; i < testSize; i++) + { + Assert.Equal(testSize - i, q.Count); + Assert.Equal(testArr[i], q.Dequeue()); + } + + Assert.Equal(q.Count, 0); + } + } + + [Fact] + public void PriorityQueueRandomAddDequeueInterweave() + { + const int addSize = 500; + const int testTimes = 5; + var r = new Random(AnyInt); + + for (var t = 0; t < testTimes; t++) + { + var q = new PriorityQueue<int>(); + var testList = new List<int>(); + for (var i = 0; i < addSize; i++) + { + var num = r.Next(); + testList.Add(num); + q.Add(num); + + var shouldRemove = r.Next(0, 10) >= 5; + if (shouldRemove) + { + var removed = q.Dequeue(); + Assert.Equal(testList.Min(), removed); + testList.Remove(removed); + } + } + + testList.Sort(); + + var testListSize = testList.Count; + Assert.Equal(testListSize, q.Count); + + for (var i = 0; i < testListSize; i++) + { + Assert.Equal(testListSize - i, q.Count); + Assert.Equal(testList[i], q.Dequeue()); + } + + Assert.Equal(q.Count, 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Utilities/Collections/PriorityQueue.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Collections/PriorityQueue.cs b/lang/cs/Org.Apache.REEF.Utilities/Collections/PriorityQueue.cs new file mode 100644 index 0000000..415e899 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Utilities/Collections/PriorityQueue.cs @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Utilities.Collections +{ + /// <summary> + /// A simple priority queue implementation, where the head of the queue is the + /// smallest element in the priority queue. + /// </summary> + public sealed class PriorityQueue<T> : ICollection<T> where T : IComparable<T> + { + private readonly List<T> _list; + + public PriorityQueue() + { + _list = new List<T>(); + } + + /// <summary> + /// Gets the enumerator of the priority queue. The Enumerator returns elements + /// in no guaranteed order. + /// </summary> + public IEnumerator<T> GetEnumerator() + { + return _list.GetEnumerator(); + } + + /// <summary> + /// Gets the enumerator of the priority queue. The Enumerator returns elements + /// in no guaranteed order. + /// </summary> + IEnumerator IEnumerable.GetEnumerator() + { + return _list.GetEnumerator(); + } + + /// <summary> + /// Peeks at the head of the priority queue, but does not remove the element + /// at the head. + /// </summary> + /// <exception cref="InvalidOperationException">Thrown when the priority queue is empty</exception> + public T Peek() + { + if (_list.Count == 0) + { + throw new InvalidOperationException("The PriorityQueue is empty."); + } + + return _list[0]; + } + + /// <summary> + /// Adds an element to the priority queue. + /// </summary> + public void Enqueue(T item) + { + Add(item); + } + + /// <summary> + /// Dequeues an item from the priority queue. Removes the head of the priority queue, which + /// is the smallest element in the priority queue. + /// </summary> + /// <exception cref="InvalidOperationException">Thrown when the priority queue is empty</exception> + public T Dequeue() + { + if (_list.Count == 0) + { + throw new InvalidOperationException("Cannot remove from an empty PriorityQueue."); + } + + // Removes element and places last to top + var ret = _list[0]; + _list[0] = _list[_list.Count - 1]; + _list.RemoveAt(_list.Count - 1); + + if (_list.Count == 0) + { + return ret; + } + + // Percolate down. + var idx = 0; + var item = _list[0]; + var lchildIdx = GetLeftChildIndex(idx); + + while (lchildIdx < _list.Count) + { + // Find the smaller child to compare + int smallerIdx; + var rchildIdx = lchildIdx + 1; + + if (rchildIdx < _list.Count) + { + smallerIdx = _list[lchildIdx].CompareTo(_list[rchildIdx]) <= 0 ? lchildIdx : rchildIdx; + } + else + { + smallerIdx = lchildIdx; + } + + // Compare with the smaller child, swap down if greater + if (item.CompareTo(_list[smallerIdx]) > 0) + { + _list[idx] = _list[smallerIdx]; + _list[smallerIdx] = item; + idx = smallerIdx; + lchildIdx = GetLeftChildIndex(idx); + } + else + { + // Order holds + break; + } + } + + return ret; + } + + /// <summary> + /// <see cref="Enqueue"/>. + /// </summary> + public void Add(T item) + { + // Add item + _list.Add(item); + var idx = _list.Count - 1; + + // Percolate up + while (idx > 0) + { + var parentIdx = (idx - 1) / 2; + + // Swap up if parent is greater + if (_list[parentIdx].CompareTo(item) > 0) + { + _list[idx] = _list[parentIdx]; + _list[parentIdx] = item; + idx = parentIdx; + } + else + { + // Order holds + break; + } + } + } + + /// <summary> + /// Clears the priority queue. + /// </summary> + public void Clear() + { + _list.Clear(); + } + + /// <summary> + /// Checks if the list contains an item. + /// </summary> + public bool Contains(T item) + { + return _list.Contains(item); + } + + /// <summary> + /// Copies the priority queue to a compatible array, starting at the array's + /// provided index. + /// </summary> + public void CopyTo(T[] array, int arrayIndex) + { + _list.CopyTo(array, arrayIndex); + } + + /// <summary> + /// Remove is not supported. + /// </summary> + /// <exception cref="NotSupportedException">Operation not supported</exception> + public bool Remove(T item) + { + throw new NotSupportedException(); + } + + /// <summary> + /// Returns the count of the priority queue. + /// </summary> + public int Count + { + get + { + return _list.Count; + } + } + + /// <summary> + /// Always returns false. + /// </summary> + public bool IsReadOnly + { + get { return false; } + } + + private static int GetLeftChildIndex(int idx) + { + return (2 * idx) + 1; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj index b251f84..5065af6 100644 --- a/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj +++ b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj @@ -47,6 +47,7 @@ under the License. <Compile Include="Attributes\UnstableAttribute.cs" /> <Compile Include="AvroUtils.cs" /> <Compile Include="ByteUtilities.cs" /> + <Compile Include="Collections\PriorityQueue.cs" /> <Compile Include="Diagnostics\DiagnosticsMessages.cs" /> <Compile Include="Diagnostics\Exceptions.cs" /> <Compile Include="IIdentifiable.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index 1acee4f..265a181 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -69,6 +69,7 @@ under the License. <Compile Include="RemoteManagerTest.cs" /> <Compile Include="StreamingRemoteManagerTest.cs" /> <Compile Include="StreamingTransportTest.cs" /> + <Compile Include="TimeTest.cs" /> <Compile Include="TransportTest.cs" /> </ItemGroup> <ItemGroup> @@ -104,4 +105,4 @@ under the License. <Target Name="AfterBuild"> </Target> --> -</Project> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Wake.Tests/TimeTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TimeTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TimeTest.cs new file mode 100644 index 0000000..11f711e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TimeTest.cs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Wake.Time.Event; +using Xunit; + +namespace Org.Apache.REEF.Wake.Tests +{ + public sealed class TimeTest + { + private const int RandomSeed = 5; + + [Fact] + public void BadTime() + { + Assert.Throws<ArgumentException>(() => new StartTime(-1)); + } + + [Fact] + public void SimpleTimeComparison() + { + long ts = new Random(RandomSeed).Next(0, 100); + var st1 = new StartTime(ts); + var st2 = new StartTime(ts); + Assert.False(ReferenceEquals(st1, st2)); + Assert.Equal(st1, st2); + Assert.Equal(st1.GetHashCode(), st2.GetHashCode()); + Assert.True(st1.CompareTo(st2) == 0); + } + + [Fact] + public void TestTimeSort() + { + const int testLen = 500; + var testArr = new Time.Time[testLen]; + var r = new Random(RandomSeed); + + for (var i = 0; i < testLen; i++) + { + var time = new StartTime(r.Next(0, 10000)); + testArr[i] = time; + } + + Array.Sort(testArr); + + for (var i = 0; i < testLen - 1; i++) + { + Assert.True(testArr[i].TimeStamp <= testArr[i + 1].TimeStamp); + Assert.True(testArr[i].CompareTo(testArr[i + 1]) <= 0); + Assert.True(testArr[i + 1].CompareTo(testArr[i]) >= 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs index fed5222..6404796 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs @@ -21,6 +21,7 @@ using System.Linq; using System.Threading; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; +using Org.Apache.REEF.Utilities.Collections; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.RX.Impl; @@ -35,7 +36,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime private readonly ITimer _timer; private readonly PubSubSubject<Time> _handlers; - private readonly ISet<Time> _schedule; + private readonly PriorityQueue<Time> _schedule; private readonly IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler; private readonly IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler; @@ -64,7 +65,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler) { _timer = timer; - _schedule = new SortedSet<Time>(); + _schedule = new PriorityQueue<Time>(); _handlers = new PubSubSubject<Time>(); _startHandler = startHandler; @@ -227,9 +228,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration)); } - Time time = _schedule.First(); - _schedule.Remove(time); - return time; + return _schedule.Dequeue(); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/d5679922/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs index d7e37f9..478cef9 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs @@ -27,6 +27,11 @@ namespace Org.Apache.REEF.Wake.Time { protected Time(long timeStamp) { + if (timeStamp < 0) + { + throw new ArgumentException("Time must have timeStamp of at least 0."); + } + TimeStamp = timeStamp; } @@ -39,7 +44,7 @@ namespace Org.Apache.REEF.Wake.Time public override int GetHashCode() { - return base.GetHashCode(); + return TimeStamp.GetHashCode(); } public override bool Equals(object obj) @@ -48,33 +53,21 @@ namespace Org.Apache.REEF.Wake.Time { return true; } - Time other = obj as Time; + + // Note: This allows for the very strange semantic where + // two different subclasses might return true for Equals. + var other = obj as Time; if (other != null) { return CompareTo(other) == 0; } + return false; } public int CompareTo(Time other) { - if (TimeStamp < other.TimeStamp) - { - return -1; - } - if (TimeStamp > other.TimeStamp) - { - return 1; - } - if (GetHashCode() < other.GetHashCode()) - { - return -1; - } - if (GetHashCode() > other.GetHashCode()) - { - return 1; - } - return 0; + return TimeStamp.CompareTo(other.TimeStamp); } } }
