Repository: incubator-reef Updated Branches: refs/heads/master 13ffefeab -> 60b9eb54b
[REEF-479] Introduce time mesuarements in BroadcastReduce and PipelineBroadcadtReduce in Network.Examples.Client This addressed the issue by * Introducing time measurements using StopWatch in master and slave tasks. * Making the intermediate computations between broadcast and reduce minimal so as to measure the time of round trip correctly. JIRA: [479](https://issues.apache.org/jira/browse/REEF-479) This Closes #299 Author: Dhruv <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/60b9eb54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/60b9eb54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/60b9eb54 Branch: refs/heads/master Commit: 60b9eb54bf47b6a0285d98276abb82eb703135b5 Parents: 13ffefe Author: Dhruv <[email protected]> Authored: Thu Jul 16 13:18:14 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Jul 17 15:28:36 2015 -0700 ---------------------------------------------------------------------- .../BroadcastReduceDriverAndTasks/MasterTask.cs | 31 +++++++++++++-- .../BroadcastReduceDriverAndTasks/SlaveTask.cs | 26 +++++++++++-- .../PipelinedMasterTask.cs | 41 +++++++++++++++----- .../PipelinedSlaveTask.cs | 38 +++++++++++++++--- 4 files changed, 114 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs index 21d23f8..02fc403 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs @@ -18,6 +18,7 @@ */ using System; +using System.Diagnostics; using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Operators; @@ -29,7 +30,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri { public class MasterTask : ITask { - private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); + private static readonly Logger Logger = Logger.GetLogger(typeof(MasterTask)); private readonly int _numIters; private readonly int _numReduceSenders; @@ -45,7 +46,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, IGroupCommClient groupCommClient) { - _logger.Log(Level.Info, "Hello from master task"); + Logger.Log(Level.Info, "Hello from master task"); _numIters = numIters; _numReduceSenders = numEvaluators - 1; _groupCommClient = groupCommClient; @@ -57,20 +58,42 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public byte[] Call(byte[] memento) { + Stopwatch broadcastTime = new Stopwatch(); + Stopwatch reduceTime = new Stopwatch(); + for (int i = 1; i <= _numIters; i++) { + if (i == 2) + { + broadcastTime.Reset(); + reduceTime.Reset(); + } + + broadcastTime.Start(); // Each slave task calculates the nth triangle number _broadcastSender.Send(i); - + broadcastTime.Stop(); + + reduceTime.Start(); // Sum up all of the calculated triangle numbers int sum = _sumReducer.Reduce(); - _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); + reduceTime.Stop(); + + Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); int expected = TriangleNumber(i) * _numReduceSenders; if (sum != TriangleNumber(i) * _numReduceSenders) { throw new Exception("Expected " + expected + " but got " + sum); } + + if (i >= 2) + { + Logger.Log(Level.Info, + string.Format("Average time (milliseconds) taken for broadcast: {0} and reduce: {1}", + broadcastTime.ElapsedMilliseconds/((double) (i - 1)), + reduceTime.ElapsedMilliseconds/((double) (i - 1)))); + } } return null; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs index 6db3a5c..11f8630 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs @@ -17,6 +17,7 @@ * under the License. */ +using System.Diagnostics; using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Operators; @@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri { public class SlaveTask : ITask { - private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); + private static readonly Logger Logger = Logger.GetLogger(typeof(SlaveTask)); private readonly int _numIterations; private readonly IGroupCommClient _groupCommClient; @@ -41,7 +42,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, IGroupCommClient groupCommClient) { - _logger.Log(Level.Info, "Hello from slave task"); + Logger.Log(Level.Info, "Hello from slave task"); _numIterations = numIters; _groupCommClient = groupCommClient; @@ -52,16 +53,33 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public byte[] Call(byte[] memento) { + Stopwatch broadcastTime = new Stopwatch(); + Stopwatch reduceTime = new Stopwatch(); + for (int i = 0; i < _numIterations; i++) { + broadcastTime.Start(); // Receive n from Master Task int n = _broadcastReceiver.Receive(); - _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n); + broadcastTime.Stop(); + + Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n); // Calculate the nth Triangle number and send it back to driver int triangleNum = TriangleNumber(n); - _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + + reduceTime.Start(); _triangleNumberSender.Send(triangleNum); + reduceTime.Stop(); + + if (i >= 1) + { + Logger.Log(Level.Info, + string.Format("Average time (milliseconds) taken for broadcast: {0} and reduce: {1}", + broadcastTime.ElapsedMilliseconds / ((double)i), + reduceTime.ElapsedMilliseconds / ((double)i))); + } } return null; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs index 4656fd9..2259cdb 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs @@ -18,6 +18,7 @@ */ using System; +using System.Diagnostics; using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Operators; @@ -63,26 +64,48 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR { int[] intArr = new int[_arraySize]; + for (int j = 0; j < _arraySize; j++) + { + intArr[j] = j; + } + + Stopwatch broadcastTime = new Stopwatch(); + Stopwatch reduceTime = new Stopwatch(); + for (int i = 1; i <= _numIters; i++) { - for (int j = 0; j < _arraySize; j++) + intArr[0] = i; + + if (i == 2) { - intArr[j] = i; + broadcastTime.Reset(); + reduceTime.Reset(); } + broadcastTime.Start(); _broadcastSender.Send(intArr); + broadcastTime.Stop(); + + reduceTime.Start(); int[] sum = _sumReducer.Reduce(); + reduceTime.Stop(); + + Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} with array length: {2}", sum[0], i, + sum.Length); - Logger.Log(Level.Info, "Received sum: {0} on iteration: {1} with array length: {2}", sum[0], i, sum.Length); + int expected = TriangleNumber(i)*_numReduceSenders; - int expected = TriangleNumber(i) * _numReduceSenders; + if (sum[0] != TriangleNumber(i)*_numReduceSenders) + { + throw new Exception("Expected " + expected + " but got " + sum[0]); + } - for (int j = 0; j < intArr.Length; j++) + if (i >= 2) { - if (sum[j] != TriangleNumber(i) * _numReduceSenders) - { - throw new Exception("Expected " + expected + " but got " + sum); - } + Logger.Log(Level.Info, + string.Format("Average time (milliseconds) taken for broadcast: {0} and reduce: {1}", + broadcastTime.ElapsedMilliseconds/((double) (i - 1)), + reduceTime.ElapsedMilliseconds/((double) (i - 1)))); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/60b9eb54/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs index 547df28..5ec77bb 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs @@ -17,6 +17,7 @@ * under the License. */ +using System.Diagnostics; using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Operators; @@ -35,14 +36,17 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastReceiver<int[]> _broadcastReceiver; private readonly IReduceSender<int[]> _triangleNumberSender; + private readonly int _arraySize; [Inject] public PipelinedSlaveTask( [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, + [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize, IGroupCommClient groupCommClient) { Logger.Log(Level.Info, "Hello from slave task"); + _arraySize = arraySize; _numIterations = numIters; _groupCommClient = groupCommClient; _commGroup = _groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); @@ -52,25 +56,49 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public byte[] Call(byte[] memento) { + int[] resArr = new int[_arraySize]; + + for (int j = 0; j < resArr.Length; j++) + { + resArr[j] = j; + } + + Stopwatch broadcastTime = new Stopwatch(); + Stopwatch reduceTime = new Stopwatch(); + for (int i = 0; i < _numIterations; i++) { + if (i == 1) + { + broadcastTime.Reset(); + reduceTime.Reset(); + } + + broadcastTime.Start(); // Receive n from Master Task int[] intVec = _broadcastReceiver.Receive(); + broadcastTime.Stop(); Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]); // Calculate the nth Triangle number and send it back to driver int triangleNum = TriangleNumber(intVec[0]); + Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); - int[] resArr = new int[intVec.Length]; + resArr[0] = triangleNum; + + reduceTime.Start(); + _triangleNumberSender.Send(resArr); + reduceTime.Stop(); - for (int j = 0; j < resArr.Length; j++) + if (i >= 1) { - resArr[j] = triangleNum; + Logger.Log(Level.Info, + string.Format("Average time (milliseconds) taken for broadcast: {0} and reduce: {1}", + broadcastTime.ElapsedMilliseconds/((double) i), + reduceTime.ElapsedMilliseconds/((double) i))); } - - _triangleNumberSender.Send(resArr); } return null;
