Repository: flink Updated Branches: refs/heads/master 1e574750d -> 1aad5b759
http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 702d656..9dd078d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -15,21 +15,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.jobmanager; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Status; +import akka.testkit.JavaTestKit; import com.typesafe.config.Config; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import org.apache.flink.runtime.messages.TaskMessages.PartitionState; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import scala.Some; import scala.Tuple2; import java.net.InetAddress; -import static org.junit.Assert.*; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class JobManagerTest { + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + @Test public void testNullHostnameGoesToLocalhost() { try { @@ -45,4 +89,113 @@ public class JobManagerTest { fail(e.getMessage()); } } + + /** + * Tests responses to partition state requests. + */ + @Test + public void testRequestPartitionState() throws Exception { + new JavaTestKit(system) {{ + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + final IntermediateDataSetID rid = new IntermediateDataSetID(); + + // Create a task + final AbstractJobVertex sender = new AbstractJobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender.createAndAddResultDataSet(rid, PIPELINED); + + final JobGraph jobGraph = new JobGraph("Blocking test job", sender); + final JobID jid = jobGraph.getJobID(); + + final ActorRef jm = cluster.getJobManager(); + + // Submit the job and wait for all vertices to be running + jm.tell(new JobManagerMessages.SubmitJob(jobGraph, false), getTestActor()); + expectMsgClass(Status.Success.class); + + jm.tell(new WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()), + getTestActor()); + + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + // This is the mock execution ID of the task requesting the state of the partition + final ExecutionAttemptID receiver = new ExecutionAttemptID(); + + // Request the execution graph to get the runtime info + jm.tell(new RequestExecutionGraph(jid), getTestActor()); + + final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class) + .executionGraph(); + + final ExecutionVertex vertex = eg.getJobVertex(sender.getID()) + .getTaskVertices()[0]; + + final IntermediateResultPartition partition = vertex.getProducedPartitions() + .values().iterator().next(); + + final ResultPartitionID partitionId = new ResultPartitionID( + partition.getPartitionId(), + vertex.getCurrentExecutionAttempt().getAttemptId()); + + // - The test ---------------------------------------------------------------------- + + // 1. All execution states + RequestPartitionState request = new RequestPartitionState( + jid, partitionId, receiver, rid); + + for (ExecutionState state : ExecutionState.values()) { + ExecutionGraphTestUtils.setVertexState(vertex, state); + + jm.tell(request, getTestActor()); + + PartitionState resp = expectMsgClass(PartitionState.class); + + assertEquals(request.taskExecutionId(), resp.taskExecutionId()); + assertEquals(request.taskResultId(), resp.taskResultId()); + assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); + assertEquals(state, resp.state()); + } + + // 2. Non-existing execution + request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid); + + jm.tell(request, getTestActor()); + + PartitionState resp = expectMsgClass(PartitionState.class); + + assertEquals(request.taskExecutionId(), resp.taskExecutionId()); + assertEquals(request.taskResultId(), resp.taskResultId()); + assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); + assertNull(resp.state()); + + // 3. Non-existing job + request = new RequestPartitionState( + new JobID(), new ResultPartitionID(), receiver, rid); + + jm.tell(request, getTestActor()); + + resp = expectMsgClass(PartitionState.class); + + assertEquals(request.taskExecutionId(), resp.taskExecutionId()); + assertEquals(request.taskResultId(), resp.taskResultId()); + assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); + assertNull(resp.state()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (cluster != null) { + cluster.shutdown(); + } + } + }}; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 131549b..dca3c58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.junit.Test; import scala.Option; +import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; import java.net.InetAddress; @@ -79,7 +80,8 @@ public class TaskManagerComponentsStartupShutdownTest { config); final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( - 32, BUFFER_SIZE, IOManager.IOMode.SYNC, Option.<NettyConfig>empty()); + 32, BUFFER_SIZE, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), + new Tuple2<Integer, Integer>(0, 0)); final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000); http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 76ff86f..a308c81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -27,49 +27,49 @@ import akka.japi.Creator; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; -import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.RegistrationMessages; import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; +import org.apache.flink.runtime.messages.TaskMessages.PartitionState; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.runtime.testingUtils.TestingTaskManager; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -78,6 +78,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.messages.JobManagerMessages.ConsumerNotificationResult; +import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; +import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -160,11 +164,11 @@ public class TaskManagerTest { } while (System.currentTimeMillis() < deadline); // task should have switched to running - Object toRunning = new TaskMessages.UpdateTaskExecutionState( + Object toRunning = new UpdateTaskExecutionState( new TaskExecutionState(jid, eid, ExecutionState.RUNNING)); // task should have switched to finished - Object toFinished = new TaskMessages.UpdateTaskExecutionState( + Object toFinished = new UpdateTaskExecutionState( new TaskExecutionState(jid, eid, ExecutionState.FINISHED)); deadline = System.currentTimeMillis() + 10000; @@ -682,6 +686,91 @@ public class TaskManagerTest { }}; } + /** + * Tests that repeated {@link PartitionNotFoundException}s fail the receiver. + */ + @Test + public void testPartitionNotFound() throws Exception { + + new JavaTestKit(system){{ + + ActorRef jobManager = null; + ActorRef taskManager = null; + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + jobManager = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(resultId, getTestActor()))); + + final int dataPort = NetUtils.getAvailablePort(); + taskManager = createTaskManager(jobManager, true, false, dataPort); + + // --------------------------------------------------------------------------------- + + final ActorRef tm = taskManager; + + final JobID jid = new JobID(); + final JobVertexID vid = new JobVertexID(); + final ExecutionAttemptID eid = new ExecutionAttemptID(); + + final ResultPartitionID partitionId = new ResultPartitionID(); + + // Remote location (on the same TM though) for the partition + final ResultPartitionLocation loc = ResultPartitionLocation + .createRemote(new ConnectionID( + new InetSocketAddress("localhost", dataPort), 0)); + + final InputChannelDeploymentDescriptor[] icdd = + new InputChannelDeploymentDescriptor[] { + new InputChannelDeploymentDescriptor(partitionId, loc)}; + + final InputGateDeploymentDescriptor igdd = + new InputGateDeploymentDescriptor(resultId, 0, icdd); + + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + jid, vid, eid, "Receiver", 0, 1, + new Configuration(), new Configuration(), + Tasks.AgnosticReceiver.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.singletonList(igdd), + Collections.<BlobKey>emptyList(), 0); + + new Within(d) { + @Override + protected void run() { + // Submit the task + tm.tell(new SubmitTask(tdd), getTestActor()); + expectMsgClass(Messages.getAcknowledge().getClass()); + + // Wait to be notified about the final execution state by the mock JM + TaskExecutionState msg = expectMsgClass(TaskExecutionState.class); + + // The task should fail after repeated requests + assertEquals(msg.getExecutionState(), ExecutionState.FAILED); + assertEquals(msg.getError(ClassLoader.getSystemClassLoader()).getClass(), + PartitionNotFoundException.class); + } + }; + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (taskManager != null) { + taskManager.tell(Kill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.tell(Kill.getInstance(), ActorRef.noSender()); + } + } + }}; + } + + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends UntypedActor { @@ -693,7 +782,7 @@ public class TaskManagerTest { final ActorRef self = getSelf(); getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self); } - else if(message instanceof TaskMessages.UpdateTaskExecutionState){ + else if(message instanceof UpdateTaskExecutionState){ getSender().tell(true, getSelf()); } } @@ -703,8 +792,8 @@ public class TaskManagerTest { @Override public void onReceive(Object message) throws Exception { - if (message instanceof JobManagerMessages.ScheduleOrUpdateConsumers) { - getSender().tell(new JobManagerMessages.ConsumerNotificationResult(true, scala.Option.<Throwable>apply(null)), getSelf()); + if (message instanceof ScheduleOrUpdateConsumers) { + getSender().tell(new ConsumerNotificationResult(true, scala.Option.<Throwable>apply(null)), getSelf()); } else { super.onReceive(message); } @@ -721,9 +810,9 @@ public class TaskManagerTest { @Override public void onReceive(Object message) throws Exception{ - if (message instanceof TaskMessages.UpdateTaskExecutionState) { - TaskMessages.UpdateTaskExecutionState updateMsg = - (TaskMessages.UpdateTaskExecutionState) message; + if (message instanceof UpdateTaskExecutionState) { + UpdateTaskExecutionState updateMsg = + (UpdateTaskExecutionState) message; if(validIDs.contains(updateMsg.taskExecutionState().getID())) { getSender().tell(true, getSelf()); @@ -736,6 +825,40 @@ public class TaskManagerTest { } } + public static class SimplePartitionStateLookupJobManager extends SimpleJobManager { + + private final ActorRef testActor; + + public SimplePartitionStateLookupJobManager(ActorRef testActor) { + this.testActor = testActor; + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof RequestPartitionState) { + final RequestPartitionState msg = (RequestPartitionState) message; + + PartitionState resp = new PartitionState( + msg.taskExecutionId(), + msg.taskResultId(), + msg.partitionId().getPartitionId(), + ExecutionState.RUNNING); + + getSender().tell(resp, getSelf()); + } + else if (message instanceof UpdateTaskExecutionState) { + final TaskExecutionState msg = ((UpdateTaskExecutionState) message) + .taskExecutionState(); + + if (msg.getExecutionState().isTerminal()) { + testActor.tell(msg, self()); + } + } else { + super.onReceive(message); + } + } + } + public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager>{ @Override @@ -762,11 +885,30 @@ public class TaskManagerTest { } } + public static class SimplePartitionStateLookupJobManagerCreator implements Creator<SimplePartitionStateLookupJobManager>{ + + private final ActorRef testActor; + + public SimplePartitionStateLookupJobManagerCreator(IntermediateDataSetID dataSetId, ActorRef testActor) { + this.testActor = testActor; + } + + @Override + public SimplePartitionStateLookupJobManager create() throws Exception { + return new SimplePartitionStateLookupJobManager(testActor); + } + } + public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration) { + return createTaskManager(jobManager, waitForRegistration, true, NetUtils.getAvailablePort()); + } + + public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, boolean useLocalCommunication, int dataPort) { ActorRef taskManager = null; try { Configuration cfg = new Configuration(); cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10); + cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); Option<String> jobMangerUrl = Option.apply(jobManager.path().toString()); @@ -774,7 +916,7 @@ public class TaskManagerTest { cfg, system, "localhost", Option.<String>empty(), jobMangerUrl, - true, TestingTaskManager.class); + useLocalCommunication, TestingTaskManager.class); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 4713bae..bcc7ffe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.Props; +import com.google.common.collect.Maps; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; @@ -38,7 +39,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryManager; @@ -51,7 +55,9 @@ import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; +import java.lang.reflect.Field; import java.util.Collections; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -64,8 +70,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -560,7 +569,7 @@ public class TaskTest { } @Test - public void testExecutionFailesAfterTaskMarkedFailed() { + public void testExecutionFailsAfterTaskMarkedFailed() { try { Task task = createTask(InvokableWithExceptionOnTrigger.class); task.registerExecutionListener(listenerActor); @@ -595,6 +604,78 @@ public class TaskTest { } } + @Test + public void testOnPartitionStateUpdate() throws Exception { + IntermediateDataSetID resultId = new IntermediateDataSetID(); + ResultPartitionID partitionId = new ResultPartitionID(); + + SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getConsumedResultId()).thenReturn(resultId); + + final Task task = createTask(InvokableBlockingInInvoke.class); + + // Set the mock input gate + setInputGate(task, inputGate); + + // Expected task state for each partition state + final Map<ExecutionState, ExecutionState> expected = Maps + .newHashMapWithExpectedSize(ExecutionState.values().length); + + // Fail the task for unexpected states + for (ExecutionState state : ExecutionState.values()) { + expected.put(state, ExecutionState.FAILED); + } + + expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING); + + expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING); + expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING); + expected.put(ExecutionState.FAILED, ExecutionState.CANCELING); + + for (ExecutionState state : ExecutionState.values()) { + setState(task, ExecutionState.RUNNING); + + task.onPartitionStateUpdate(resultId, partitionId.getPartitionId(), state); + + ExecutionState newTaskState = task.getExecutionState(); + + assertEquals(expected.get(state), newTaskState); + } + + verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + } + + // ------------------------------------------------------------------------ + + private void setInputGate(Task task, SingleInputGate inputGate) { + try { + Field f = Task.class.getDeclaredField("inputGates"); + f.setAccessible(true); + f.set(task, new SingleInputGate[]{inputGate}); + + Map<IntermediateDataSetID, SingleInputGate> byId = Maps.newHashMapWithExpectedSize(1); + byId.put(inputGate.getConsumedResultId(), inputGate); + + f = Task.class.getDeclaredField("inputGatesById"); + f.setAccessible(true); + f.set(task, byId); + } + catch (Exception e) { + throw new RuntimeException("Modifying the task state failed", e); + } + } + + private void setState(Task task, ExecutionState state) { + try { + Field f = Task.class.getDeclaredField("executionState"); + f.setAccessible(true); + f.set(task, state); + } + catch (Exception e) { + throw new RuntimeException("Modifying the task state failed", e); + } + } + private Task createTask(Class<? extends AbstractInvokable> invokable) { LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 9069573..f3c061c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -30,20 +30,20 @@ import org.junit.Assert; import org.junit.Test; public class ClassLoaderITCase { - + private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar"; private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar"; private static final String KMEANS_JAR_PATH = "target/kmeans-test-jar.jar"; - + @Test public void testJobWithCustomInputFormat() { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - + ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false); try { http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java index 41b84e6..01f1b00 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java @@ -27,7 +27,6 @@ public class WordCountITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; - @Override protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT);