Repository: flink
Updated Branches:
  refs/heads/master 1e574750d -> 1aad5b759


http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 702d656..9dd078d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -15,21 +15,65 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.jobmanager;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.Some;
 import scala.Tuple2;
 
 import java.net.InetAddress;
 
-import static org.junit.Assert.*;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class JobManagerTest {
 
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
        @Test
        public void testNullHostnameGoesToLocalhost() {
                try {
@@ -45,4 +89,113 @@ public class JobManagerTest {
                        fail(e.getMessage());
                }
        }
+
+       /**
+        * Tests responses to partition state requests.
+        */
+       @Test
+       public void testRequestPartitionState() throws Exception {
+               new JavaTestKit(system) {{
+                       // Setup
+                       TestingCluster cluster = null;
+
+                       try {
+                               cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+                               final IntermediateDataSetID rid = new 
IntermediateDataSetID();
+
+                               // Create a task
+                               final AbstractJobVertex sender = new 
AbstractJobVertex("Sender");
+                               sender.setParallelism(1);
+                               
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                               sender.createAndAddResultDataSet(rid, 
PIPELINED);
+
+                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", sender);
+                               final JobID jid = jobGraph.getJobID();
+
+                               final ActorRef jm = cluster.getJobManager();
+
+                               // Submit the job and wait for all vertices to 
be running
+                               jm.tell(new 
JobManagerMessages.SubmitJob(jobGraph, false), getTestActor());
+                               expectMsgClass(Status.Success.class);
+
+                               jm.tell(new 
WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()),
+                                               getTestActor());
+
+                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                               // This is the mock execution ID of the task 
requesting the state of the partition
+                               final ExecutionAttemptID receiver = new 
ExecutionAttemptID();
+
+                               // Request the execution graph to get the 
runtime info
+                               jm.tell(new RequestExecutionGraph(jid), 
getTestActor());
+
+                               final ExecutionGraph eg = 
expectMsgClass(ExecutionGraphFound.class)
+                                               .executionGraph();
+
+                               final ExecutionVertex vertex = 
eg.getJobVertex(sender.getID())
+                                               .getTaskVertices()[0];
+
+                               final IntermediateResultPartition partition = 
vertex.getProducedPartitions()
+                                               .values().iterator().next();
+
+                               final ResultPartitionID partitionId = new 
ResultPartitionID(
+                                               partition.getPartitionId(),
+                                               
vertex.getCurrentExecutionAttempt().getAttemptId());
+
+                               // - The test 
----------------------------------------------------------------------
+
+                               // 1. All execution states
+                               RequestPartitionState request = new 
RequestPartitionState(
+                                               jid, partitionId, receiver, 
rid);
+
+                               for (ExecutionState state : 
ExecutionState.values()) {
+                                       
ExecutionGraphTestUtils.setVertexState(vertex, state);
+
+                                       jm.tell(request, getTestActor());
+
+                                       PartitionState resp = 
expectMsgClass(PartitionState.class);
+
+                                       assertEquals(request.taskExecutionId(), 
resp.taskExecutionId());
+                                       assertEquals(request.taskResultId(), 
resp.taskResultId());
+                                       
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+                                       assertEquals(state, resp.state());
+                               }
+
+                               // 2. Non-existing execution
+                               request = new RequestPartitionState(jid, new 
ResultPartitionID(), receiver, rid);
+
+                               jm.tell(request, getTestActor());
+
+                               PartitionState resp = 
expectMsgClass(PartitionState.class);
+
+                               assertEquals(request.taskExecutionId(), 
resp.taskExecutionId());
+                               assertEquals(request.taskResultId(), 
resp.taskResultId());
+                               
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+                               assertNull(resp.state());
+
+                               // 3. Non-existing job
+                               request = new RequestPartitionState(
+                                               new JobID(), new 
ResultPartitionID(), receiver, rid);
+
+                               jm.tell(request, getTestActor());
+
+                               resp = expectMsgClass(PartitionState.class);
+
+                               assertEquals(request.taskExecutionId(), 
resp.taskExecutionId());
+                               assertEquals(request.taskResultId(), 
resp.taskResultId());
+                               
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+                               assertNull(resp.state());
+                       }
+                       catch (Exception e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+                       finally {
+                               if (cluster != null) {
+                                       cluster.shutdown();
+                               }
+                       }
+               }};
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 131549b..dca3c58 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.junit.Test;
 import scala.Option;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -79,7 +80,8 @@ public class TaskManagerComponentsStartupShutdownTest {
                                        config);
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
-                                       32, BUFFER_SIZE, IOManager.IOMode.SYNC, 
Option.<NettyConfig>empty());
+                                       32, BUFFER_SIZE, IOManager.IOMode.SYNC, 
Option.<NettyConfig>empty(),
+                                       new Tuple2<Integer, Integer>(0, 0));
 
                        final InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 76ff86f..a308c81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -27,49 +27,49 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -78,6 +78,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.ConsumerNotificationResult;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import static 
org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -160,11 +164,11 @@ public class TaskManagerTest {
                                                } while 
(System.currentTimeMillis() < deadline);
 
                                                // task should have switched to 
running
-                                               Object toRunning = new 
TaskMessages.UpdateTaskExecutionState(
+                                               Object toRunning = new 
UpdateTaskExecutionState(
                                                                new 
TaskExecutionState(jid, eid, ExecutionState.RUNNING));
 
                                                // task should have switched to 
finished
-                                               Object toFinished = new 
TaskMessages.UpdateTaskExecutionState(
+                                               Object toFinished = new 
UpdateTaskExecutionState(
                                                                new 
TaskExecutionState(jid, eid, ExecutionState.FINISHED));
                                                
                                                deadline = 
System.currentTimeMillis() + 10000;
@@ -682,6 +686,91 @@ public class TaskManagerTest {
                }};
        }
 
+       /**
+        * Tests that repeated {@link PartitionNotFoundException}s fail the 
receiver.
+        */
+       @Test
+       public void testPartitionNotFound() throws Exception {
+
+               new JavaTestKit(system){{
+
+                       ActorRef jobManager = null;
+                       ActorRef taskManager = null;
+
+                       try {
+                               final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+                               // Create the JM
+                               jobManager = system.actorOf(Props.create(
+                                               new 
SimplePartitionStateLookupJobManagerCreator(resultId, getTestActor())));
+
+                               final int dataPort = 
NetUtils.getAvailablePort();
+                               taskManager = createTaskManager(jobManager, 
true, false, dataPort);
+
+                               // 
---------------------------------------------------------------------------------
+
+                               final ActorRef tm = taskManager;
+
+                               final JobID jid = new JobID();
+                               final JobVertexID vid = new JobVertexID();
+                               final ExecutionAttemptID eid = new 
ExecutionAttemptID();
+
+                               final ResultPartitionID partitionId = new 
ResultPartitionID();
+
+                               // Remote location (on the same TM though) for 
the partition
+                               final ResultPartitionLocation loc = 
ResultPartitionLocation
+                                               .createRemote(new ConnectionID(
+                                                               new 
InetSocketAddress("localhost", dataPort), 0));
+
+                               final InputChannelDeploymentDescriptor[] icdd =
+                                               new 
InputChannelDeploymentDescriptor[] {
+                                                               new 
InputChannelDeploymentDescriptor(partitionId, loc)};
+
+                               final InputGateDeploymentDescriptor igdd =
+                                               new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
+
+                               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
+                                               jid, vid, eid, "Receiver", 0, 1,
+                                               new Configuration(), new 
Configuration(),
+                                               
Tasks.AgnosticReceiver.class.getName(),
+                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                               Collections.singletonList(igdd),
+                                               
Collections.<BlobKey>emptyList(), 0);
+
+                               new Within(d) {
+                                       @Override
+                                       protected void run() {
+                                               // Submit the task
+                                               tm.tell(new SubmitTask(tdd), 
getTestActor());
+                                               
expectMsgClass(Messages.getAcknowledge().getClass());
+
+                                               // Wait to be notified about 
the final execution state by the mock JM
+                                               TaskExecutionState msg = 
expectMsgClass(TaskExecutionState.class);
+
+                                               // The task should fail after 
repeated requests
+                                               
assertEquals(msg.getExecutionState(), ExecutionState.FAILED);
+                                               
assertEquals(msg.getError(ClassLoader.getSystemClassLoader()).getClass(),
+                                                               
PartitionNotFoundException.class);
+                                       }
+                               };
+                       }
+                       catch(Exception e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+                       finally {
+                               if (taskManager != null) {
+                                       taskManager.tell(Kill.getInstance(), 
ActorRef.noSender());
+                               }
+
+                               if (jobManager != null) {
+                                       jobManager.tell(Kill.getInstance(), 
ActorRef.noSender());
+                               }
+                       }
+               }};
+       }
+
+
        // 
--------------------------------------------------------------------------------------------
 
        public static class SimpleJobManager extends UntypedActor {
@@ -693,7 +782,7 @@ public class TaskManagerTest {
                                final ActorRef self = getSelf();
                                getSender().tell(new 
RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
                        }
-                       else if(message instanceof 
TaskMessages.UpdateTaskExecutionState){
+                       else if(message instanceof UpdateTaskExecutionState){
                                getSender().tell(true, getSelf());
                        }
                }
@@ -703,8 +792,8 @@ public class TaskManagerTest {
 
                @Override
                public void onReceive(Object message) throws Exception {
-                       if (message instanceof 
JobManagerMessages.ScheduleOrUpdateConsumers) {
-                               getSender().tell(new 
JobManagerMessages.ConsumerNotificationResult(true, 
scala.Option.<Throwable>apply(null)), getSelf());
+                       if (message instanceof ScheduleOrUpdateConsumers) {
+                               getSender().tell(new 
ConsumerNotificationResult(true, scala.Option.<Throwable>apply(null)), 
getSelf());
                        } else {
                                super.onReceive(message);
                        }
@@ -721,9 +810,9 @@ public class TaskManagerTest {
 
                @Override
                public void onReceive(Object message) throws Exception{
-                       if (message instanceof 
TaskMessages.UpdateTaskExecutionState) {
-                               TaskMessages.UpdateTaskExecutionState updateMsg 
=
-                                               
(TaskMessages.UpdateTaskExecutionState) message;
+                       if (message instanceof UpdateTaskExecutionState) {
+                               UpdateTaskExecutionState updateMsg =
+                                               (UpdateTaskExecutionState) 
message;
 
                                
if(validIDs.contains(updateMsg.taskExecutionState().getID())) {
                                        getSender().tell(true, getSelf());
@@ -736,6 +825,40 @@ public class TaskManagerTest {
                }
        }
 
+       public static class SimplePartitionStateLookupJobManager extends 
SimpleJobManager {
+
+               private final ActorRef testActor;
+
+               public SimplePartitionStateLookupJobManager(ActorRef testActor) 
{
+                       this.testActor = testActor;
+               }
+
+               @Override
+               public void onReceive(Object message) throws Exception {
+                       if (message instanceof RequestPartitionState) {
+                               final RequestPartitionState msg = 
(RequestPartitionState) message;
+
+                               PartitionState resp = new PartitionState(
+                                               msg.taskExecutionId(),
+                                               msg.taskResultId(),
+                                               
msg.partitionId().getPartitionId(),
+                                               ExecutionState.RUNNING);
+
+                               getSender().tell(resp, getSelf());
+                       }
+                       else if (message instanceof UpdateTaskExecutionState) {
+                               final TaskExecutionState msg = 
((UpdateTaskExecutionState) message)
+                                               .taskExecutionState();
+
+                               if (msg.getExecutionState().isTerminal()) {
+                                       testActor.tell(msg, self());
+                               }
+                       } else {
+                               super.onReceive(message);
+                       }
+               }
+       }
+
        public static class SimpleLookupJobManagerCreator implements 
Creator<SimpleLookupJobManager>{
 
                @Override
@@ -762,11 +885,30 @@ public class TaskManagerTest {
                }
        }
 
+       public static class SimplePartitionStateLookupJobManagerCreator 
implements Creator<SimplePartitionStateLookupJobManager>{
+
+               private final ActorRef testActor;
+
+               public 
SimplePartitionStateLookupJobManagerCreator(IntermediateDataSetID dataSetId, 
ActorRef testActor) {
+                       this.testActor = testActor;
+               }
+
+               @Override
+               public SimplePartitionStateLookupJobManager create() throws 
Exception {
+                       return new 
SimplePartitionStateLookupJobManager(testActor);
+               }
+       }
+
        public static ActorRef createTaskManager(ActorRef jobManager, boolean 
waitForRegistration) {
+               return createTaskManager(jobManager, waitForRegistration, true, 
NetUtils.getAvailablePort());
+       }
+
+       public static ActorRef createTaskManager(ActorRef jobManager, boolean 
waitForRegistration, boolean useLocalCommunication, int dataPort) {
                ActorRef taskManager = null;
                try {
                        Configuration cfg = new Configuration();
                        
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
 
                        Option<String> jobMangerUrl = 
Option.apply(jobManager.path().toString());
 
@@ -774,7 +916,7 @@ public class TaskManagerTest {
                                        cfg, system, "localhost",
                                        Option.<String>empty(),
                                        jobMangerUrl,
-                                       true, TestingTaskManager.class);
+                                       useLocalCommunication, 
TestingTaskManager.class);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4713bae..bcc7ffe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
 
+import com.google.common.collect.Maps;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -38,7 +39,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -51,7 +55,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -64,8 +70,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -560,7 +569,7 @@ public class TaskTest {
        }
 
        @Test
-       public void testExecutionFailesAfterTaskMarkedFailed() {
+       public void testExecutionFailsAfterTaskMarkedFailed() {
                try {
                        Task task = 
createTask(InvokableWithExceptionOnTrigger.class);
                        task.registerExecutionListener(listenerActor);
@@ -595,6 +604,78 @@ public class TaskTest {
                }
        }
 
+       @Test
+       public void testOnPartitionStateUpdate() throws Exception {
+               IntermediateDataSetID resultId = new IntermediateDataSetID();
+               ResultPartitionID partitionId = new ResultPartitionID();
+
+               SingleInputGate inputGate = mock(SingleInputGate.class);
+               when(inputGate.getConsumedResultId()).thenReturn(resultId);
+
+               final Task task = createTask(InvokableBlockingInInvoke.class);
+
+               // Set the mock input gate
+               setInputGate(task, inputGate);
+
+               // Expected task state for each partition state
+               final Map<ExecutionState, ExecutionState> expected = Maps
+                               
.newHashMapWithExpectedSize(ExecutionState.values().length);
+
+               // Fail the task for unexpected states
+               for (ExecutionState state : ExecutionState.values()) {
+                       expected.put(state, ExecutionState.FAILED);
+               }
+
+               expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
+
+               expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
+               expected.put(ExecutionState.CANCELING, 
ExecutionState.CANCELING);
+               expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
+
+               for (ExecutionState state : ExecutionState.values()) {
+                       setState(task, ExecutionState.RUNNING);
+
+                       task.onPartitionStateUpdate(resultId, 
partitionId.getPartitionId(), state);
+
+                       ExecutionState newTaskState = task.getExecutionState();
+
+                       assertEquals(expected.get(state), newTaskState);
+               }
+
+               verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private void setInputGate(Task task, SingleInputGate inputGate) {
+               try {
+                       Field f = Task.class.getDeclaredField("inputGates");
+                       f.setAccessible(true);
+                       f.set(task, new SingleInputGate[]{inputGate});
+
+                       Map<IntermediateDataSetID, SingleInputGate> byId = 
Maps.newHashMapWithExpectedSize(1);
+                       byId.put(inputGate.getConsumedResultId(), inputGate);
+
+                       f = Task.class.getDeclaredField("inputGatesById");
+                       f.setAccessible(true);
+                       f.set(task, byId);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Modifying the task state 
failed", e);
+               }
+       }
+
+       private void setState(Task task, ExecutionState state) {
+               try {
+                       Field f = Task.class.getDeclaredField("executionState");
+                       f.setAccessible(true);
+                       f.set(task, state);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Modifying the task state 
failed", e);
+               }
+       }
+
        private Task createTask(Class<? extends AbstractInvokable> invokable) {
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 9069573..f3c061c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -30,20 +30,20 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class ClassLoaderITCase {
-       
+
        private static final String INPUT_SPLITS_PROG_JAR_FILE = 
"target/customsplit-test-jar.jar";
 
        private static final String STREAMING_PROG_JAR_FILE = 
"target/streamingclassloader-test-jar.jar";
 
        private static final String KMEANS_JAR_PATH = 
"target/kmeans-test-jar.jar";
-       
+
        @Test
        public void testJobWithCustomInputFormat() {
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
2);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-                       
+
                        ForkableFlinkMiniCluster testCluster = new 
ForkableFlinkMiniCluster(config, false);
 
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index 41b84e6..01f1b00 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -27,7 +27,6 @@ public class WordCountITCase extends JavaProgramTestBase {
        protected String textPath;
        protected String resultPath;
 
-
        @Override
        protected void preSubmit() throws Exception {
                textPath = createTempFile("text.txt", WordCountData.TEXT);

Reply via email to