Repository: reef
Updated Branches:
  refs/heads/master 2de3998cb -> 026b5ea60


[REEF-1466] Cancel the blocking message reading and close the task properly

Add cancellation token to GC and Network API as an optional parameter
Update IMRU tasks to pass cancelation token
Update IMRU close handler to cancel the token
Test case update and new test cases

JIRA: [REEF-1466](https://issues.apache.org/jira/browse/REEF-1466)

Pull Request: Closes #1052


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/026b5ea6
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/026b5ea6
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/026b5ea6

Branch: refs/heads/master
Commit: 026b5ea6038cac6f9e05d24654e43d3e97db6b03
Parents: 2de3998
Author: Julia Wang <[email protected]>
Authored: Tue Jul 5 20:04:24 2016 -0700
Committer: dhruv <[email protected]>
Committed: Tue Jul 12 15:48:45 2016 -0700

----------------------------------------------------------------------
 .../OnREEF/IMRUTasks/MapTaskHost.cs             |  49 +++++-
 .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs    |  57 ++-----
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  88 +++++++----
 .../EnforceCloseTimeoutMilliseconds.cs          |  31 ----
 .../Org.Apache.REEF.IMRU.csproj                 |   1 -
 .../GroupCommunicationTests.cs                  |  98 ++++++++++++
 .../GroupCommunicationTreeTopologyTests.cs      |  90 +++++++++++
 .../Group/Operators/IBroadcastReceiver.cs       |   5 +-
 .../Group/Operators/IReduceReceiver.cs          |   8 +-
 .../Group/Operators/IReduceSender.cs            |   6 +-
 .../Group/Operators/IScatterReceiver.cs         |   4 +-
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   6 +-
 .../Group/Operators/Impl/ReduceReceiver.cs      |   6 +-
 .../Group/Operators/Impl/ReduceSender.cs        |   8 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |   7 +-
 .../Group/Task/IOperatorTopology.cs             |  12 +-
 .../Group/Task/Impl/ChildNodeContainer.cs       |  10 +-
 .../Group/Task/Impl/NodeStruct.cs               |  20 ++-
 .../Group/Task/Impl/OperatorTopology.cs         |  24 +--
 .../Functional/Bridge/TestCloseTask.cs          | 153 +++++--------------
 .../Functional/IMRU/IMRUCloseTaskTest.cs        | 134 +++++++++++-----
 .../Functional/ReefFunctionalTest.cs            |   8 +-
 22 files changed, 531 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index 86b1f7c..bce1e4d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.IO;
 using System.Text;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
@@ -29,6 +30,7 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -63,6 +65,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
+        /// The cancellation token to control the group communication 
operation cancellation
+        /// </summary>
+        private readonly CancellationTokenSource _cancellationSource;
+
+        /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
@@ -83,6 +90,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _dataReducer = 
cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
             _taskCloseCoordinator = taskCloseCoordinator;
+            _cancellationSource = new CancellationTokenSource();
         }
 
         /// <summary>
@@ -94,7 +102,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         {
             MapControlMessage controlMessage = MapControlMessage.AnotherRound;
 
-            while (!_taskCloseCoordinator.ShouldCloseTask() && controlMessage 
!= MapControlMessage.Stop)
+            while (!_cancellationSource.IsCancellationRequested && 
controlMessage != MapControlMessage.Stop)
             {
                 if (_invokeGC)
                 {
@@ -103,18 +111,45 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     GC.WaitForPendingFinalizers();
                 }
 
-                using (
-                    MapInputWithControlMessage<TMapInput> mapInput = 
_dataAndMessageReceiver.Receive())
+                try
+                {
+                    using (
+                    MapInputWithControlMessage<TMapInput> mapInput = 
_dataAndMessageReceiver.Receive(_cancellationSource))
+                    {
+                        controlMessage = mapInput.ControlMessage;
+                        if (controlMessage != MapControlMessage.Stop)
+                        {
+                            _dataReducer.Send(_mapTask.Map(mapInput.Message), 
_cancellationSource);
+                        }
+                    }
+                }
+                catch (OperationCanceledException e)
+                {
+                    Logger.Log(Level.Warning, "Received 
OperationCanceledException in MapTaskHost with message: {0}.", e.Message);
+                    break;
+                }
+                catch (IOException e)
+                {
+                    Logger.Log(Level.Error, "Received IOException in 
MapTaskHost with message: {0}.", e.Message);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    }
+                    break;
+                }
+                catch (TcpClientConnectionException e)
                 {
-                    controlMessage = mapInput.ControlMessage;
-                    if (controlMessage != MapControlMessage.Stop)
+                    Logger.Log(Level.Error, "Received 
TcpClientConnectionException in MapTaskHost with message: {0}.", e.Message);
+                    if (!_cancellationSource.IsCancellationRequested)
                     {
-                        _dataReducer.Send(_mapTask.Map(mapInput.Message));
+                        throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
                     }
+                    break;
                 }
             }
 
             _taskCloseCoordinator.SignalTaskStopped();
+            Logger.Log(Level.Info, "MapTaskHost returned with cancellation 
token:{0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }
 
@@ -124,7 +159,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            _taskCloseCoordinator.HandleEvent(closeEvent);
+            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
index f60271a..a9014c3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
@@ -29,8 +29,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
     /// <summary>
     /// This class provides a method to handle Task close event. It is called 
from TaskCloseEventHandler. 
-    /// It also wraps flags to represent if the task should be closed and if 
the task has been stopped
-    /// so that to provide a coordination between the task and the close 
handler.  
     /// </summary>
     [ThreadSafe]
     internal sealed class TaskCloseCoordinator
@@ -38,74 +36,41 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskCloseCoordinator));
 
         /// <summary>
-        /// When a close event is received, this variable is set to 1. At the 
beginning of each task iteration,
-        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
-        /// </summary>
-        private long _shouldCloseTask = 0;
-
-        /// <summary>
-        /// Waiting time for the task to close by itself
-        /// </summary>
-        private readonly int _enforceCloseTimeoutMilliseconds;
-
-        /// <summary>
-        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// An event that will wait in close handler to be signaled from Call 
method.
         /// </summary>
         private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
 
         /// <summary>
-        /// Handle task close event and manage the states, wait/signal when 
closing the task
+        /// Handle task close event, wait/signal when closing the task
         /// </summary>
-        /// <param name="enforceCloseTimeoutMilliseconds">Timeout in 
milliseconds to enforce the task to close if receiving task close event</param>
         [Inject]
-        private 
TaskCloseCoordinator([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds)
+        private TaskCloseCoordinator()
         {
-            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
         }
 
         /// <summary>
         /// Handle Task close event.
-        /// Set _shouldCloseTask to 1 so that to inform the task to stop at 
the end of the current iteration.
-        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
-        /// If the closed event is sent from driver, checks if the 
_waitToCloseEvent has been signaled. If not, throw 
-        /// IMRUTaskSystemException to enforce the task to stop.
+        /// Cancel the CancellationToken for data reading operation, then 
waiting for the signal from Call method. 
         /// </summary>
         /// <param name="closeEvent"></param>
-        internal void HandleEvent(ICloseEvent closeEvent)
+        /// <param name="cancellationTokenSource"></param>
+        internal void HandleEvent(ICloseEvent closeEvent, 
CancellationTokenSource cancellationTokenSource)
         {
-            Interlocked.Exchange(ref _shouldCloseTask, 1);
-            var taskSignaled = 
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+            cancellationTokenSource.Cancel();
+            _waitToCloseEvent.Wait();
 
             if (closeEvent.Value.IsPresent())
             {
-                var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
-                if (msg.Equals(TaskManager.CloseTaskByDriver))
-                {
-                    Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
-
-                    if (!taskSignaled)
-                    {
-                        throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                    }
-                }
+                Logger.Log(Level.Info, "The task received close event with 
message: {0}.", Encoding.UTF8.GetString(closeEvent.Value.Value));
             }
             else
             {
-                Logger.Log(Level.Warning, "The task received close event with 
no message.");
+                Logger.Log(Level.Info, "The task received close event with no 
message.");
             }
         }
 
         /// <summary>
-        /// Indicates if the task should be stopped.
-        /// </summary>
-        /// <returns></returns>
-        internal bool ShouldCloseTask()
-        {
-            return Interlocked.Read(ref _shouldCloseTask) == 1;
-        }
-
-        /// <summary>
-        /// Called from Task right before the task is returned to signals 
_waitToCloseEvent. 
+        /// Called from Task right before the task is returned to signals 
_waitToCloseEvent.
         /// </summary>
         internal void SignalTaskStopped()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 116bc63..af4fbf1 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -16,7 +16,7 @@
 // under the License.
 
 using System;
-using System.Text;
+using System.IO;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
@@ -29,6 +29,7 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -65,6 +66,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
+        /// The cancellation token to control the group communication 
operation cancellation
+        /// </summary>
+        private readonly CancellationTokenSource _cancellationSource;
+
+        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
@@ -88,6 +94,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _invokeGC = invokeGC;
             _resultHandler = resultHandler;
             _taskCloseCoordinator = taskCloseCoordinator;
+            _cancellationSource = new CancellationTokenSource();
         }
 
         /// <summary>
@@ -99,44 +106,69 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         {
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
-
-            while (updateResult.HasMapInput && 
!_taskCloseCoordinator.ShouldCloseTask())
+            try
             {
-                iterNo++;
-
-                using (
-                    var message = new 
MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
-                        MapControlMessage.AnotherRound))
+                while (updateResult.HasMapInput && 
!_cancellationSource.IsCancellationRequested)
                 {
-                    _dataAndControlMessageSender.Send(message);
+                    iterNo++;
+
+                    using (
+                        var message = new 
MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
+                            MapControlMessage.AnotherRound))
+                    {
+                        _dataAndControlMessageSender.Send(message);
+                    }
+
+                    var input = _dataReceiver.Reduce(_cancellationSource);
+
+                    if (_invokeGC)
+                    {
+                        Logger.Log(Level.Verbose, "Calling Garbage Collector");
+                        GC.Collect();
+                        GC.WaitForPendingFinalizers();
+                    }
+
+                    updateResult = _updateTask.Update(input);
+
+                    if (updateResult.HasResult)
+                    {
+                        _resultHandler.HandleResult(updateResult.Result);
+                    }
                 }
-
-                var input = _dataReceiver.Reduce();
-
-                if (_invokeGC)
+                if (!_cancellationSource.IsCancellationRequested)
                 {
-                    Logger.Log(Level.Verbose, "Calling Garbage Collector");
-                    GC.Collect();
-                    GC.WaitForPendingFinalizers();
+                    MapInputWithControlMessage<TMapInput> stopMessage =
+                        new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
+                    _dataAndControlMessageSender.Send(stopMessage);
                 }
-
-                updateResult = _updateTask.Update(input);
-
-                if (updateResult.HasResult)
+            }
+            catch (OperationCanceledException e)
+            {
+                Logger.Log(Level.Warning,
+                    "Received OperationCanceledException in UpdateTaskHost 
with message: {0}.",
+                    e.Message);
+            }
+            catch (IOException e)
+            {
+                Logger.Log(Level.Error, "Received IOException in 
UpdateTaskHost with message: {0}.", e.Message);
+                if (!_cancellationSource.IsCancellationRequested)
                 {
-                    _resultHandler.HandleResult(updateResult.Result);
+                    throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
                 }
             }
-
-            if (!_taskCloseCoordinator.ShouldCloseTask())
+            catch (TcpClientConnectionException e)
             {
-                MapInputWithControlMessage<TMapInput> stopMessage =
-                    new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
-                _dataAndControlMessageSender.Send(stopMessage);
+                Logger.Log(Level.Error,
+                    "Received TcpClientConnectionException in UpdateTaskHost 
with message: {0}.",
+                    e.Message);
+                if (!_cancellationSource.IsCancellationRequested)
+                {
+                    throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                }
             }
-
             _resultHandler.Dispose();
             _taskCloseCoordinator.SignalTaskStopped();
+            Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation 
token {0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }
 
@@ -146,7 +178,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            _taskCloseCoordinator.HandleEvent(closeEvent);
+            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
deleted file mode 100644
index e177895..0000000
--- 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// 
-//   http://www.apache.org/licenses/LICENSE-2.0
-// 
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
-{
-    /// <summary>
-    /// When driver sends close event to a task, it would expect the task to 
close gracefully. 
-    /// After specified time out, if the task is still not closed, the close 
handler will throw exception, 
-    /// enforce the task to close after waiting for this much time (in 
milliseconds). 
-    /// </summary>
-    [NamedParameter("Enforce the task to close after waiting for this much 
time (in milliseconds).", "EnforceCloseTimeout", "1000")]
-    internal sealed class EnforceCloseTimeoutMilliseconds : Name<int>
-    {
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index cdf87cc..30d110a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -100,7 +100,6 @@ under the License.
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
 />
-    <Compile Include="OnREEF\Parameters\EnforceCloseTimeoutMilliseconds.cs" />
     <Compile Include="OnREEF\Parameters\InvokeGC .cs" />
     <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 5eefd44..2f5fe8a 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -380,6 +380,53 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(value, receiver2.Receive());
         }
 
+        /// <summary>
+        /// Test IBroadcastReceiver.Receive() with cancellation token
+        /// </summary>
+        [Fact]
+        public void TestBroadcastOperatorWithCancelation()
+        {
+            string groupName = "group1";
+            string operatorName = "broadcast";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 10;
+            int fanOut = 3;
+
+            IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
+
+            var commGroup = groupCommDriver.DefaultGroup
+                .AddBroadcast(operatorName, masterTaskId)
+                .Build();
+
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
+
+            IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
+            IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
+            IBroadcastReceiver<int> receiver2 = 
commGroups[2].GetBroadcastReceiver<int>(operatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+
+            var token = new CancellationTokenSource();
+            var taskThread1 = new Thread(() =>
+            {
+                Action receive = () => receiver1.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            var taskThread2 = new Thread(() =>
+            {
+                Action receive = () => receiver2.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            taskThread1.Start();
+            taskThread2.Start();
+            token.Cancel();
+        }
+
         [Fact]
         public void TestBroadcastOperatorWithDefaultCodec()
         {
@@ -613,6 +660,57 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(4, receiver4.Receive().Single());
         }
 
+        /// <summary>
+        /// Test IScatterRecever.Receive() with and without Cancellation token.
+        /// </summary>
+        [Fact]
+        public void TestScatterOperatorWithCancellation()
+        {
+            string groupName = "group1";
+            string operatorName = "scatter";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 5;
+            int fanOut = 2;
+
+            IGroupCommDriver groupCommDriver = 
GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, 
numTasks);
+
+            var commGroup = groupCommDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId)
+                .Build();
+
+            List<ICommunicationGroupClient> commGroups = 
CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, 
GetDefaultCodecConfig());
+
+            IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
+            IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver2 = 
commGroups[2].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver3 = 
commGroups[3].GetScatterReceiver<int>(operatorName);
+            IScatterReceiver<int> receiver4 = 
commGroups[4].GetScatterReceiver<int>(operatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+            Assert.NotNull(receiver3);
+            Assert.NotNull(receiver4);
+
+            List<int> data = new List<int> { 1, 2, 3, 4 };
+            var token = new CancellationTokenSource();
+            sender.Send(data);
+            Assert.Equal(1, receiver1.Receive(token).Single());
+            Assert.Equal(2, receiver2.Receive(token).Single());
+            Assert.Equal(3, receiver3.Receive(token).Single());
+            Assert.Equal(4, receiver4.Receive(token).Single());
+
+            var taskThread = new Thread(() =>
+            {
+                Action receive = () => receiver1.Receive(token);
+                Assert.Throws<OperationCanceledException>(receive);
+            });
+
+            taskThread.Start();
+            token.Cancel();
+        }
+
         [Fact]
         public void TestScatterOperator2()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 45c9bbd..ee037c8 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Operators;
@@ -635,6 +637,94 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
             Assert.Equal(sum, 6325);
         }
 
+        /// <summary>
+        /// Test IReduceSender.Send() and IReduceReceiver.Receive() with and 
without cancellation token
+        /// </summary>
+        [Fact]
+        public void TestScatterReduceOperatorsWithCancelation()
+        {
+            string groupName = "group1";
+            string scatterOperatorName = "scatter";
+            string reduceOperatorName = "reduce";
+            string masterTaskId = "task0";
+            string driverId = "Driver Id";
+            int numTasks = 5;
+            int fanOut = 2;
+
+            var groupCommDriver = 
GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
+            ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
+              .AddScatter<int>(
+                    scatterOperatorName,
+                    masterTaskId,
+                    TopologyTypes.Tree,
+                    GetDefaultDataConverterConfig())
+                .AddReduce<int>(
+                    reduceOperatorName,
+                    masterTaskId,
+                    TopologyTypes.Tree,
+                    GetDefaultDataConverterConfig(),
+                    GetDefaultReduceFuncConfig())
+                .Build();
+
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, 
commGroup, GetDefaultCodecConfig());
+
+            IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(scatterOperatorName);
+            IReduceReceiver<int> sumReducer = 
commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender1 = 
commGroups[1].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver2 = 
commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender2 = 
commGroups[2].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver3 = 
commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender3 = 
commGroups[3].GetReduceSender<int>(reduceOperatorName);
+
+            IScatterReceiver<int> receiver4 = 
commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
+            IReduceSender<int> sumSender4 = 
commGroups[4].GetReduceSender<int>(reduceOperatorName);
+
+            Assert.NotNull(sender);
+            Assert.NotNull(receiver1);
+            Assert.NotNull(receiver2);
+            Assert.NotNull(receiver3);
+            Assert.NotNull(receiver4);
+
+            List<int> data = Enumerable.Range(1, 100).ToList();
+
+            sender.Send(data);
+
+            List<int> data1 = receiver1.Receive();
+            List<int> data2 = receiver2.Receive();
+
+            List<int> data3 = receiver3.Receive();
+            List<int> data4 = receiver4.Receive();
+
+            int sum3 = data3.Sum();
+            sumSender3.Send(sum3);
+
+            int sum4 = data4.Sum();
+            sumSender4.Send(sum4);
+
+            int sum2 = data2.Sum();
+            sumSender2.Send(sum2);
+
+            int sum1 = data1.Sum();
+
+            var token = new CancellationTokenSource();
+            token.Cancel();
+            Action send = () => sumSender1.Send(sum1, token);
+            Assert.Throws<OperationCanceledException>(send);
+
+            var taskThread = new Thread(() =>
+            {
+                Action reduce = () => sumReducer.Reduce(token);
+                Assert.Throws<OperationCanceledException>(reduce);
+            });
+
+            taskThread.Start();
+            token.Cancel();
+        }
+
         private IConfiguration GetDefaultCodecConfig()
         {
             return StreamingCodecConfiguration<int>.Conf

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
index 209e6fd..da9a0ec 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -26,7 +28,8 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// <summary>
         /// Receive a message from parent BroadcastSender.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The incoming message</returns>
-        T Receive();
+        T Receive(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
index 1490834..89db753 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -26,13 +28,15 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// <summary>
         /// Returns the class used to reduce incoming messages sent by 
ReduceSenders.
         /// </summary>
-        IReduceFunction<T> ReduceFunction { get; } 
+        IReduceFunction<T> ReduceFunction { get; }
 
         /// <summary>
         /// Receives messages sent by all ReduceSenders and aggregates them
         /// using the specified IReduceFunction.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The single aggregated data</returns>
-        T Reduce();
+        //// TODO : REEF-1489 to remove null
+        T Reduce(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
index 6c3aca6..f8d22d4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     /// <summary>
@@ -27,6 +29,8 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// Get reduced data from children, reduce with the data given, then 
sends reduced data to parent
         /// </summary>
         /// <param name="data">The data to send</param>
-        void Send(T data);
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
+        //// TODO : REEF-1489 to remove null
+        void Send(T data, CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
index 9b313e4..5b622b9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 
 namespace Org.Apache.REEF.Network.Group.Operators
 {
@@ -30,6 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// Receive a sublist of messages sent from the IScatterSender.
         /// </summary>
         /// <returns>The sublist of messages</returns>
-        List<T> Receive();
+        //// TODO : REEF-1489 to remove null
+        List<T> Receive(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index bc72fea..cad774d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -87,15 +88,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Receive a message from parent BroadcastSender.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The incoming message</returns>
-        public T Receive()
+        public T Receive(CancellationTokenSource cancellationSource = null)
         {
             PipelineMessage<T> message;
             var messageList = new List<PipelineMessage<T>>();
 
             do
             {
-                message = _topology.ReceiveFromParent();
+                message = _topology.ReceiveFromParent(cancellationSource);
 
                 if (_topology.HasChildren())
                 {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index e94c1ea..fb129da 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
@@ -99,15 +100,16 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Receives messages sent by all ReduceSenders and aggregates them
         /// using the specified IReduceFunction.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The single aggregated data</returns>
-        public T Reduce()
+        public T Reduce(CancellationTokenSource cancellationSource = null)
         {
             PipelineMessage<T> message;
             var messageList = new List<PipelineMessage<T>>();
 
             do
             {
-                message = _topology.ReceiveFromChildren(_pipelinedReduceFunc);
+                message = _topology.ReceiveFromChildren(_pipelinedReduceFunc, 
cancellationSource);
                 messageList.Add(message);
             } 
             while (!message.IsLast);

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 813db3e..76202b2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -16,8 +16,8 @@
 // under the License.
 
 using System;
-using System.Reactive;
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -50,7 +50,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="initialize">Require Topology Initialize to be called 
to wait for all task being registered. 
         /// Default is true. For unit testing, it can be set to false.</param>
         /// <param name="topology">The Task's operator topology graph</param>
-        /// <param name="networkHandler">The handler used to handle incoming 
messages</param>
         /// <param name="reduceFunction">The function used to reduce the 
incoming messages</param>
         /// <param name="dataConverter">The converter used to convert original
         /// message to pipelined ones and vice versa.</param>
@@ -105,7 +104,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Sends the data to the operator's ReduceReceiver to be aggregated.
         /// </summary>
         /// <param name="data">The data to send</param>
-        public void Send(T data)
+        /// <param name="cancellationSource">The cancellationSource for cancel 
the operation</param>
+        public void Send(T data, CancellationTokenSource cancellationSource = 
null)
         {
             var messageList = PipelineDataConverter.PipelineMessage(data);
 
@@ -118,7 +118,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             {
                 if (_topology.HasChildren())
                 {
-                    var reducedValueOfChildren = 
_topology.ReceiveFromChildren(_pipelinedReduceFunc);
+                    var reducedValueOfChildren = 
_topology.ReceiveFromChildren(_pipelinedReduceFunc, cancellationSource);
 
                     var mergeddData = new List<PipelineMessage<T>> { message };
 

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index d6fdfa1..c9d3c49 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -17,6 +17,7 @@
 
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -84,15 +85,15 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// Receive a sublist of messages sent from the IScatterSender.
         /// </summary>
         /// <returns>The sublist of messages</returns>
-        public List<T> Receive()
+        public List<T> Receive(CancellationTokenSource cancellationSource = 
null)
         {
-            IList<T> elements = _topology.ReceiveListFromParent();
+            IList<T> elements = 
_topology.ReceiveListFromParent(cancellationSource);
             _topology.ScatterToChildren(elements, MessageType.Data);
             return elements.ToList();
         }
 
         /// <summary>
-        /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
+        /// Ensure all parent and children nodes in the topology are 
registered with the Name Service.
         /// </summary>
         void IGroupCommOperatorInternal.WaitForRegistration()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
index bc9a1be..97877e8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 
@@ -79,22 +80,27 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// <summary>
         /// Receive an incoming message from the parent Task.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The parent Task's message</returns>
-        T ReceiveFromParent();
+        //// TODO : REEF-1489 to remove null
+        T ReceiveFromParent(CancellationTokenSource cancellationSource = null);
 
         /// <summary>
         /// Receive a list of incoming messages from the parent Task.
         /// </summary>
         /// <returns>The parent Task's list of messages</returns>
-        IList<T> ReceiveListFromParent();
+        //// TODO : REEF-1489 to remove null
+        IList<T> ReceiveListFromParent(CancellationTokenSource 
cancellationSource = null);
 
         /// <summary>
         /// Receives all messages from child Tasks and reduces them with the
         /// given IReduceFunction.
         /// </summary>
         /// <param name="reduceFunction">The class used to reduce 
messages</param>
+        /// <param name="cancellationSource">The cancellationSource to cancel 
the operation</param>
         /// <returns>The result of reducing messages</returns>
-        T ReceiveFromChildren(IReduceFunction<T> reduceFunction);
+        //// TODO : REEF-1489 to remove null
+        T ReceiveFromChildren(IReduceFunction<T> reduceFunction, 
CancellationTokenSource cancellationSource = null);
 
         /// <summary>
         /// Checks if the node has children

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
index 297212e..37c91a4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
@@ -18,7 +18,9 @@
 using System.Collections;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -28,6 +30,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     [NotThreadSafe]
     internal sealed class ChildNodeContainer<T> : IEnumerable<NodeStruct<T>>
     {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ChildNodeContainer<T>));
+
         private readonly Dictionary<string, NodeStruct<T>> _childIdToNodeMap = 
             new Dictionary<string, NodeStruct<T>>();
 
@@ -68,9 +72,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Gets the data from all children nodes synchronously.
         /// </summary>
-        public IEnumerable<T> GetDataFromAllChildren()
+        /// <param name="cancellationSource">The cancellation token for 
GetData operation</param>
+        public IEnumerable<T> GetDataFromAllChildren(CancellationTokenSource 
cancellationSource = null)
         {
-            return this.SelectMany(child => child.GetData());
+            var r = this.SelectMany(child => 
child.GetData(cancellationSource));
+            return r;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index 2140c61..ddc7e8c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Concurrent;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -27,6 +30,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// <typeparam name="T"> Generic type of message</typeparam>
     internal sealed class NodeStruct<T>
     {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(NodeStruct<T>));
+
         private readonly BlockingCollection<GroupCommunicationMessage<T>> 
_messageQueue;
 
         /// <summary>
@@ -62,10 +67,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Gets the first message in the message queue.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The first available message.</returns>
-        internal T[] GetData()
+        internal T[] GetData(CancellationTokenSource cancellationSource = null)
         {
-            return _messageQueue.Take().Data;
+            if (cancellationSource == null || 
!cancellationSource.IsCancellationRequested)
+            {
+                var r = cancellationSource == null
+                    ? _messageQueue.Take().Data
+                    : _messageQueue.Take(cancellationSource.Token).Data;
+                return r;
+            }
+            else
+            {
+                throw new OperationCanceledException("GetData operation is 
canceled");
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 66faa29..c4d4d0a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -62,8 +62,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="groupName">The name of the operator's Communication 
Group</param>
         /// <param name="taskId">The operator's Task identifier</param>
         /// <param name="timeout">Timeout value for cancellation token</param>
-        /// <param name="retryCount">Number of times to retry wating for 
registration</param>
-        /// <param name="sleepTime">Sleep time between retry wating for 
registration</param>
+        /// <param name="retryCount">Number of times to retry waiting for 
registration</param>
+        /// <param name="sleepTime">Sleep time between retry waiting for 
registration</param>
         /// <param name="rootId">The identifier for the root Task in the 
topology graph</param>
         /// <param name="childIds">The set of child Task identifiers in the 
topology graph</param>
         /// <param name="networkObserver"></param>
@@ -244,10 +244,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Receive an incoming message from the parent Task.
         /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The parent Task's message</returns>
-        public T ReceiveFromParent()
+        public T ReceiveFromParent(CancellationTokenSource cancellationSource 
= null)
         {
-            T[] data = _parent.GetData();
+            T[] data = _parent.GetData(cancellationSource);
             if (data == null || data.Length != 1)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -256,9 +257,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             return data[0];
         }
 
-        public IList<T> ReceiveListFromParent()
+        /// <summary>
+        /// Receive a list of incoming messages from the parent Task.
+        /// </summary>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
+        /// <returns></returns>
+        public IList<T> ReceiveListFromParent(CancellationTokenSource 
cancellationSource = null)
         {
-            T[] data = _parent.GetData();
+            T[] data = _parent.GetData(cancellationSource);
             if (data == null || data.Length == 0)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -272,15 +278,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// given IReduceFunction.
         /// </summary>
         /// <param name="reduceFunction">The class used to reduce 
messages</param>
+        /// <param name="cancellationSource">The cancellation token for the 
data reading operation cancellation</param>
         /// <returns>The result of reducing messages</returns>
-        public T ReceiveFromChildren(IReduceFunction<T> reduceFunction)
+        public T ReceiveFromChildren(IReduceFunction<T> reduceFunction, 
CancellationTokenSource cancellationSource = null)
         {
             if (reduceFunction == null)
             {
                 throw new ArgumentNullException("reduceFunction");
             }
 
-            return 
reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren());
+            return 
reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren(cancellationSource));
         }
 
         public bool HasChildren()
@@ -297,7 +304,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             GeneralGroupCommunicationMessage gcm = new 
GroupCommunicationMessage<T>(_groupName, _operatorName,
                 _selfId, node.Identifier, message);
-
             _sender.Send(gcm);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index d50489c..5f08b56 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Text;
 using System.Threading;
@@ -29,7 +30,6 @@ using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         private const string CompletedValidationMessage = 
"CompletedValidationmessage";
         private const string FailToCloseTaskMessage = "FailToCloseTaskMessage";
         private const string BreakTaskMessage = "BreakTaskMessage";
+        private const string CancelTaskMessage = "CancelTaskMessage";
         private const string EnforceToCloseMessage = "EnforceToCloseMessage";
 
         /// <summary>
@@ -74,35 +75,18 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
-        /// This test is to close a running task and enforce it to break and 
return after the current iteration
+        /// This test is to close a running task and with CalcellationToken
         /// </summary>
         [Fact]
-        public void TestBreakTaskOnLocalRuntime()
+        public void TestCancelTaskWithTaskCloseCoordinatorOnLocalRuntime()
         {
             string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForBreakTask()), typeof(CloseTaskTestDriver), 1, 
"TestBreakTask", "local", testFolder);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForCancellationTask()), typeof(CloseTaskTestDriver), 1, 
"TestBreakTask", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
             
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 1);
             var messages = new List<string>();
             messages.Add(DisposeMessageFromDriver);
-            messages.Add(BreakTaskMessage);
-            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, -1);
-            CleanUp(testFolder);
-        }
-
-        /// <summary>
-        /// This test is to close a running task and enforce it to break and 
return after the current iteration
-        /// </summary>
-        [Fact]
-        public void TestEnforceCloseTaskOnLocalRuntime()
-        {
-            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
-            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, 
"TestEnforceCloseTask", "local", testFolder);
-            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
-            
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 0);
-            var messages = new List<string>();
-            messages.Add(DisposeMessageFromDriver);
-            messages.Add(EnforceToCloseMessage);
+            messages.Add(CancelTaskMessage);
             ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, -1);
             CleanUp(testFolder);
         }
@@ -149,26 +133,12 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 .Build();
         }
 
-        private IConfiguration GetTaskConfigurationForBreakTask()
+        private IConfiguration GetTaskConfigurationForCancellationTask()
         {
             return TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "TaskID")
-                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Build();
-        }
-        private IConfiguration GetTaskConfigurationForEnforceToCloseTask()
-        {
-            var taskConfig = TaskConfiguration.ConfigurationModule
-                .Set(TaskConfiguration.Identifier, "TaskID-EnforceToClose")
-                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
-                .Build();
-
-            return TangFactory.GetTang()
-                .NewConfigurationBuilder(taskConfig)
-                .BindIntNamedParam<EnforceCloseTimeoutMilliseconds>("1000")
-                .BindNamedParameter<EnforceClose, 
bool>(GenericType<EnforceClose>.Class, "true")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByCancellationTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByCancellationTask>.Class)
                 .Build();
         }
 
@@ -321,7 +291,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
 
             [Inject]
-            private 
CloseByReturnTestTask([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeout)
+            private CloseByReturnTestTask()
             {
             }
 
@@ -364,108 +334,69 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        /// <summary>
-        /// This is a testing task. It serves for two test cases.
-        /// In the first case, EnforceClose is false (default). When the task 
receives the close event, it signals the Call method
-        /// to let it continue the iteration. As _shouldCloseTask is set to 1, 
the Call() will return after
-        /// completing the current iteration.
-        /// In the second case, EnforceClose is set to true. When the task 
receives the close event, it sets
-        /// _shouldCloseTask to 1. As the task is hung in this scenario, 
Call() would never return.
-        ///  After waiting for _enforceCloseTimeoutMilliseconds, the close 
handler throws an exception, enforcing the task to stop.
-        /// </summary>
-        private sealed class CloseByBreakAndEnforceToStopTask : ITask, 
IObserver<ICloseEvent>
+        private sealed class CloseByCancellationTask : ITask, 
IObserver<ICloseEvent>
         {
-            private long _shouldCloseTask = 0;
-            private long _isTaskStopped = 0;
-            private readonly bool _enforceClose;
-            private readonly int _enforceCloseTimeoutMilliseconds;
+            /// <summary>
+            /// Task close Coordinator to handle the work when receiving task 
close event
+            /// </summary>
+            private readonly TaskCloseCoordinator _taskCloseCoordinator;
+
+            /// <summary>
+            /// The cancellation token to control the group communication 
operation cancellation
+            /// </summary>
+            private readonly CancellationTokenSource _cancellationSource;
 
-            private readonly CountdownEvent _suspendSignal1 = new 
CountdownEvent(1);
-            private readonly CountdownEvent _suspendSignal2 = new 
CountdownEvent(1);
-            private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+            /// <summary>
+            /// A blocking collection to simulate a blocking data reading
+            /// </summary>
+            private readonly BlockingCollection<int> _messageQueue;
 
             [Inject]
-            private CloseByBreakAndEnforceToStopTask(
-                [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
-                [Parameter(typeof(EnforceClose))] bool enforceClose)
+            private CloseByCancellationTask(TaskCloseCoordinator 
taskCloseCoordinator)
             {
-                _enforceClose = enforceClose;
-                _enforceCloseTimeoutMilliseconds = 
enforceCloseTimeoutMilliseconds;
+                _taskCloseCoordinator = taskCloseCoordinator;
+                _cancellationSource = new CancellationTokenSource();
+                _messageQueue = new BlockingCollection<int>();
             }
 
+            /// <summary>
+            /// Blocking the call until it is canceled, then signal the 
TaskCloseCoordinator
+            /// </summary>
+            /// <param name="memento"></param>
+            /// <returns></returns>
             public byte[] Call(byte[] memento)
             {
-                int iterate = 1;
-
-                while (Interlocked.Read(ref _shouldCloseTask) == 0 && iterate 
< 100)
+                try
                 {
-                    iterate++;
-                    if (_enforceClose)
-                    {
-                        _suspendSignal1.Wait();
-                    }
-                    else
-                    {
-                        _suspendSignal2.Wait();
-                    }
+                    _messageQueue.Take(_cancellationSource.Token);
                 }
-
-                Interlocked.Exchange(ref _isTaskStopped, 1);
-
-                if (Interlocked.Read(ref _shouldCloseTask) == 1)
+                catch (OperationCanceledException)
                 {
-                    Logger.Log(Level.Info, BreakTaskMessage);
-                    _waitToCloseEvent.Set();
+                    Logger.Log(Level.Info, CancelTaskMessage);
                 }
-
+                _taskCloseCoordinator.SignalTaskStopped();
                 return null;
             }
 
             public void Dispose()
             {
-                Logger.Log(Level.Info, "Task is disposed.");
             }
 
             /// <summary>
-            /// When the close event is received, it sets _shouldCloseTask to 
1.
-            /// If _enforceClose is false, _suspendSignal2 is signaled to let 
the task to continue to run. This is to simulate that the
-            /// task is running properly and will break after completing the 
current iteration. It will set the _waitToCloseEvent
-            /// to let the flow in the close event handler to continue.
-            /// If _enforceClose is true,  _suspendSignal1 will be not 
signaled, this is to simulate that the task is hung.
-            /// After waiting for specified time, the close handler will throw 
exception to enforce the task to stop.
+            /// Task close handler. Call TaskCloseCoordinator to handle the 
event.
             /// </summary>
             /// <param name="closeEvent"></param>
             public void OnNext(ICloseEvent closeEvent)
             {
-                if (closeEvent.Value.IsPresent() && 
Encoding.UTF8.GetString(closeEvent.Value.Value).Equals(DisposeMessageFromDriver))
-                {
-                    Logger.Log(Level.Info, "Closed event received in task:" + 
Encoding.UTF8.GetString(closeEvent.Value.Value));
-                    Interlocked.Exchange(ref _shouldCloseTask, 1);
-                    if (!_enforceClose)
-                    {
-                        _suspendSignal2.Signal();
-                    }
-
-                    
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
-
-                    if (Interlocked.Read(ref _isTaskStopped) == 0)
-                    {
-                        Logger.Log(Level.Info, EnforceToCloseMessage);
-                        throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                    }
-                }
-                else
-                {
-                    throw new Exception("Expected close event message is not 
received.");
-                }
+                _taskCloseCoordinator.HandleEvent(closeEvent, 
_cancellationSource);
             }
 
-            public void OnCompleted()
+            public void OnError(Exception error)
             {
                 throw new NotImplementedException();
             }
 
-            public void OnError(Exception error)
+            public void OnCompleted()
             {
                 throw new NotImplementedException();
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index c3521cd..b462438 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -16,9 +16,13 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Network;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities;
@@ -30,24 +34,23 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     /// <summary>
     /// This is to test close event handler in IMRU tasks
-    /// The test provide IRunningTask, IFailedTask and ICOmpletedTask handlers 
so that to trigger close events and handle the 
+    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers 
so that to trigger close events and handle the 
     /// failed tasks and completed tasks
     /// </summary>
     [Collection("FunctionalTests")]
     public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
     {
         private const string CompletedTaskMessage = "CompletedTaskMessage";
+        private const string FailEvaluatorMessage = "FailEvaluatorMessage";
         private const string FailTaskMessage = "FailTaskMessage";
 
         /// <summary>
         /// This test is for running in local runtime
         /// It sends close event for all the running tasks.
-        /// It first informs the Call method to stop.
-        /// If Call method is running properly, it will respect to this flag 
and will return properly, that will end up ICompletedTask event.
-        ////If Call method is hung some where and cannot be returned, the 
close handler will throw exception, that would cause IFailedTask event.
-        /// As we are testing IMRU Task not a test task, the behavior is not 
deterministic. It can be CompletedTask or FailedTask
-        /// No matter how the task is closed, the total number of completed 
task and failed task should be equal to the 
-        /// total number of the tasks.
+        /// In the task close handler, the cancellation token will be set, and 
as a result tasks will return from the Call() 
+        /// method and driver will receive ICompletedTask.
+        /// In the exceptional case, task might throw exception from Call() 
method, as a result, driver will receive IFailedTask. 
+        /// Expect number of CompletedTask and FailedTask equals to the total 
number of tasks. No failed Evaluator. 
         /// </summary>
         [Fact]
         public void TestTaskCloseOnLocalRuntime()
@@ -55,27 +58,21 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int chunkSize = 2;
             const int dims = 50;
             const int iterations = 200;
-            const int mapperMemory = 5120;
-            const int updateTaskMemory = 5120;
+            const int mapperMemory = 512;
+            const int updateTaskMemory = 512;
             const int numTasks = 4;
             var testFolder = DefaultRuntimeFolder + TestId;
             TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
-            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
-            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
120);
             var completedCount = GetMessageCount(lines, CompletedTaskMessage);
-            Assert.Equal(numTasks, failedCount + completedCount);
+            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            Assert.Equal(numTasks, completedCount + failedCount);
             CleanUp(testFolder);
         }
 
         /// <summary>
         /// Same testing for running on YARN
         /// It sends close event for all the running tasks.
-        /// It first informs the Call method to stop.
-        /// If Call method is running properly, it will respect to this flag 
and will return properly, that will end up ICompletedTask event.
-        ////If Call method is hung some where and cannot be returned, the 
close handler will throw exception, that would cause IFailedTask event.
-        /// As we are testing IMRU Task not a test task, the behavior is not 
deterministic. It can be CompletedTask or FailedTask
-        /// No matter how the task is closed, the total number of completed 
task and failed task should be equal to the 
-        /// total number of the tasks.
         /// </summary>
         [Fact(Skip = "Requires Yarn")]
         public void TestTaskCloseOnLocalRuntimeOnYarn()
@@ -83,15 +80,15 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int chunkSize = 2;
             const int dims = 50;
             const int iterations = 200;
-            const int mapperMemory = 5120;
-            const int updateTaskMemory = 5120;
+            const int mapperMemory = 512;
+            const int updateTaskMemory = 512;
             const int numTasks = 4;
             TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
         }
 
         /// <summary>
         /// This method overrides base class method and defines its own event 
handlers for driver. 
-        /// It uses its own RunningTaskHandler, FailedTaskHandler and 
CompletedTaskHandler so that to simulate the test scenarios 
+        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and 
CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios 
         /// and verify the test result. 
         /// Rest of the event handlers use those from IMRUDriver. In 
IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
         /// </summary>
@@ -112,7 +109,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
                     GenericType<TestHandlers>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
-                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                    GenericType<TestHandlers>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
@@ -124,43 +121,112 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
+        /// Mapper function configuration. Add TcpConfiguration to the base 
configuration
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            return Configurations.Merge(GetTcpConfiguration(), 
base.BuildMapperFunctionConfig());
+        }
+
+        /// <summary>
+        /// Update function configuration. Add TcpConfiguration to the base 
configuration.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            return Configurations.Merge(GetTcpConfiguration(), 
base.BuildUpdateFunctionConfig());
+        }
+
+        /// <summary>
+        /// Override default setting for retry policy
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "5")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
+        }
+
+        /// <summary>
         /// Test handlers
         /// </summary>
-        internal sealed class TestHandlers : IObserver<IRunningTask>, 
IObserver<IFailedTask>, IObserver<ICompletedTask>
+        internal sealed class TestHandlers : IObserver<IRunningTask>, 
IObserver<ICompletedTask>, IObserver<IFailedTask>, IObserver<IFailedEvaluator>
         {
+            private readonly ISet<IRunningTask> _runningTasks = new 
HashSet<IRunningTask>();
+            private readonly object _lock = new object();
+
             [Inject]
             private TestHandlers()
             {
             }
 
             /// <summary>
-            /// Log the task id and dispose the context
+            /// Add the RunningTask to _runningTasks and dispose the last 
received running task
             /// </summary>
             public void OnNext(IRunningTask value)
             {
-                Logger.Log(Level.Info, "Received running task, closing it" + 
value.Id);
-                
value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, "Received running task:" + 
value.Id);
+                    _runningTasks.Add(value);
+                    if (_runningTasks.Count == 4)
+                    {
+                        Logger.Log(Level.Info, "Dispose running task from 
driver:" + value.Id);
+                        
value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                        _runningTasks.Remove(value);
+                    }
+                }
             }
 
             /// <summary>
-            /// Validate the event and dispose the context
+            /// Log the task id and FailTaskMessage
+            /// Close the rest of the running tasks, then dispose the context
             /// </summary>
             /// <param name="value"></param>
             public void OnNext(IFailedTask value)
             {
-                Logger.Log(Level.Info, FailTaskMessage + value.Id);
-                var failedExeption = 
ByteUtilities.ByteArraysToString(value.Data.Value);
-                Assert.Contains(TaskManager.TaskKilledByDriver, 
failedExeption);
-                value.GetActiveContext().Value.Dispose();
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, FailTaskMessage + value.Id);
+                    CloseRunningTasks();
+                    value.GetActiveContext().Value.Dispose();
+                }
+            }
+
+            /// <summary>
+            /// No Failed Evaluator is expected
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IFailedEvaluator value)
+            {
+                throw new Exception(FailEvaluatorMessage);
             }
 
             /// <summary>
-            /// Log the task id and dispose the context
+            /// Log the task id and ICompletedTask
+            /// Close the rest of the running tasks, then dispose the context
             /// </summary>
             public void OnNext(ICompletedTask value)
             {
-                Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
-                value.ActiveContext.Dispose();
+                lock (_lock)
+                {
+                    Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
+                    CloseRunningTasks();
+                    value.ActiveContext.Dispose();
+                }
+            }
+
+            private void CloseRunningTasks()
+            {
+                foreach (var task in _runningTasks)
+                {
+                    Logger.Log(Level.Info, "Dispose running task from driver:" 
+ task.Id);
+                    
task.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+                }
+                _runningTasks.Clear();
             }
 
             public void OnCompleted()

http://git-wip-us.apache.org/repos/asf/reef/blob/026b5ea6/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 149cc9b..4923940 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -252,11 +252,11 @@ namespace Org.Apache.REEF.Tests.Functional
             }
         }
 
-        internal string[] ReadLogFile(string logFileName, string subfolder = 
"driver", string testFolder = DefaultRuntimeFolder)
+        internal string[] ReadLogFile(string logFileName, string subfolder = 
"driver", string testFolder = DefaultRuntimeFolder, int retryCount = 60)
         {
             string fileName = string.Empty;
             string[] lines = null;
-            for (int i = 0; i < 60; i++)
+            for (int i = 0; i < retryCount; i++)
             {
                 try
                 {
@@ -266,12 +266,12 @@ namespace Org.Apache.REEF.Tests.Functional
                 }
                 catch (Exception e)
                 {
-                    if (i == 59)
+                    if (i == retryCount - 1)
                     {
                         // log only last exception before failure
                         Logger.Log(Level.Verbose, e.ToString());
                     }
-                    if (i < 59)
+                    if (i < retryCount - 1)
                     {
                         Thread.Sleep(SleepTime);
                     }

Reply via email to