Repository: flink
Updated Branches:
  refs/heads/master 034d9a3ab -> 2dfd463e2


http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 236e922..e4d0f65 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -41,12 +41,10 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
@@ -70,7 +68,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -168,91 +165,6 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
        }
 
        /**
-        * Tests that submissions to non-leaders are handled.
-        */
-       @Test
-       public void testSubmitJobToNonLeader() throws Exception {
-               Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
-                               ZooKeeper.getConnectString(), 
FileStateBackendBasePath.getPath());
-
-               // Configure the cluster
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-               TestingCluster flink = new TestingCluster(config, false, false);
-
-               try {
-                       final Deadline deadline = TestTimeOut.fromNow();
-
-                       // Start the JobManager and TaskManager
-                       flink.start(true);
-
-                       JobGraph jobGraph = createBlockingJobGraph();
-
-                       List<ActorRef> bothJobManagers = 
flink.getJobManagersAsJava();
-
-                       ActorGateway leadingJobManager = 
flink.getLeaderGateway(deadline.timeLeft());
-
-                       ActorGateway nonLeadingJobManager;
-                       if 
(bothJobManagers.get(0).equals(leadingJobManager.actor())) {
-                               nonLeadingJobManager = new 
AkkaActorGateway(bothJobManagers.get(1), null);
-                       }
-                       else {
-                               nonLeadingJobManager = new 
AkkaActorGateway(bothJobManagers.get(0), null);
-                       }
-
-                       log.info("Leading job manager: " + leadingJobManager);
-                       log.info("Non-leading job manager: " + 
nonLeadingJobManager);
-
-                       // Submit the job
-                       nonLeadingJobManager.tell(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED));
-
-                       log.info("Submitted job graph to " + 
nonLeadingJobManager);
-
-                       // Wait for the job to start. We are asking the 
*leading** JM here although we've
-                       // submitted the job to the non-leading JM. This is the 
behaviour under test.
-                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING,
-                                       leadingJobManager, deadline.timeLeft());
-
-                       log.info("Wait that the non-leader removes the 
submitted job.");
-
-                       // Make sure that the **non-leading** JM has actually 
removed the job graph from its
-                       // local state.
-                       boolean success = false;
-                       while (!success && deadline.hasTimeLeft()) {
-                               JobStatusResponse jobStatusResponse = 
JobManagerActorTestUtils.requestJobStatus(
-                                               jobGraph.getJobID(), 
nonLeadingJobManager, deadline.timeLeft());
-
-                               if (jobStatusResponse instanceof 
JobManagerMessages.JobNotFound) {
-                                       success = true;
-                               }
-                               else {
-                                       
log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString());
-                                       Thread.sleep(100);
-                               }
-                       }
-
-                       if (!success) {
-                               fail("Non-leading JM was still holding 
reference to the job graph.");
-                       }
-
-                       Future<Object> jobRemoved = leadingJobManager.ask(
-                               new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
-                               deadline.timeLeft());
-
-                       leadingJobManager.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
-
-                       Await.ready(jobRemoved, deadline.timeLeft());
-               }
-               finally {
-                       flink.shutdown();
-               }
-
-               // Verify that everything is clean
-               verifyCleanRecoveryState(config);
-       }
-
-       /**
         * Tests that clients receive updates after recovery by a new leader.
         */
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 440cfff..e38fab4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -24,6 +24,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -75,7 +76,7 @@ public class LocalFlinkMiniClusterITCase {
                        final ActorGateway jmGateway = 
miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
                        new JavaTestKit(system) {{
-                               final ActorGateway selfGateway = new 
AkkaActorGateway(getRef(), null);
+                               final ActorGateway selfGateway = new 
AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
                                new Within(TestingUtils.TESTING_DURATION()) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 8534ba8..a09c5b2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -93,7 +93,9 @@ public class UtilsTest extends TestLogger {
 
                        Configuration flinkConfig = new Configuration();
                        YarnConfiguration yarnConfig = new YarnConfiguration();
-                       TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService();
+                       TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
+                               null,
+                               null);
                        String applicationMasterHostName = "localhost";
                        String webInterfaceURL = "foobar";
                        ContaineredTaskManagerParameters taskManagerParameters 
= new ContaineredTaskManagerParameters(

Reply via email to