Repository: flink
Updated Branches:
  refs/heads/master 0f8d76c6f -> 4870fd62f


[tests] Fix unstable test WebFrontendITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acb051d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acb051d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acb051d8

Branch: refs/heads/master
Commit: acb051d8db3acca0ff97628b602aa21dcbf3604f
Parents: 0f8d76c
Author: Stephan Ewen <[email protected]>
Authored: Fri Mar 11 18:57:13 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Sun Mar 13 16:03:03 2016 +0100

----------------------------------------------------------------------
 .../flink/test/web/WebFrontendITCase.java       | 171 ++++++++++---------
 1 file changed, 90 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acb051d8/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 48888a4..31084a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -27,28 +27,26 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONObject;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+
 import io.netty.handler.codec.http.HttpResponseStatus;
 
-import java.util.Arrays;
-import java.util.Collection;
+import java.io.File;
+import java.nio.file.Files;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -57,30 +55,40 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@RunWith(Parameterized.class)
-public class WebFrontendITCase extends MultipleProgramsTestBase {
+import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP;
 
-       // make sure that the webserver is started for us!
-       static {
-               startWebServer = true;
-       }
+public class WebFrontendITCase {
 
-       private static int port = -1;
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int NUM_SLOTS = 4;
+       
+       private static ForkableFlinkMiniCluster cluster;
 
+       private static int port = -1;
+       
        @BeforeClass
-       public static void initialize() {
-               WebMonitor webMonitor = cluster.webMonitor().get();
-               port = webMonitor.getServerPort();
-       }
-
-       public WebFrontendITCase(TestExecutionMode m) {
-               super(m);
-       }
-
-       @Parameterized.Parameters(name = "Execution mode = {0}")
-       public static Collection<Object[]> executionModes() {
-               return Arrays.<Object[]>asList(
-                       new Object[] { TestExecutionMode.CLUSTER } );
+       public static void initialize() throws Exception {
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS);
+               config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
12);
+               config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
+               File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+               assertTrue("Unable to delete temp file", logDir.delete());
+               assertTrue("Unable to create temp directory", logDir.mkdir());
+               File logFile = new File(logDir, "jobmanager.log");
+               File outFile = new File(logDir, "jobmanager.out");
+               
+               Files.createFile(logFile.toPath());
+               Files.createFile(outFile.toPath());
+               
+               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.getAbsolutePath());
+
+               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster.start();
+               
+               port = cluster.webMonitor().get().getServerPort();
        }
 
        @Test
@@ -128,7 +136,8 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
 
                        JsonNode taskManager = taskManagers.get(0);
                        assertNotNull(taskManager);
-                       assertEquals(4, taskManager.get("freeSlots").asInt());
+                       assertEquals(NUM_SLOTS, 
taskManager.get("slotsNumber").asInt());
+                       assertTrue(taskManager.get("freeSlots").asInt() <= 
NUM_SLOTS);
                }
                catch(Exception e) {
                        e.printStackTrace();
@@ -137,22 +146,16 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
        }
 
        @Test
-       public void getLogAndStdoutFiles() {
-               try {
-                       WebMonitorUtils.LogFileLocation logFiles = 
WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+       public void getLogAndStdoutFiles() throws Exception {
+               WebMonitorUtils.LogFileLocation logFiles = 
WebMonitorUtils.LogFileLocation.find(cluster.configuration());
 
-                       FileUtils.writeStringToFile(logFiles.logFile, "job 
manager log");
-                       String logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/log");
-                       assertTrue(logs.contains("job manager log"));
+               FileUtils.writeStringToFile(logFiles.logFile, "job manager 
log");
+               String logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/log");
+               assertTrue(logs.contains("job manager log"));
 
-                       FileUtils.writeStringToFile(logFiles.stdOutFile, "job 
manager out");
-                       logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/stdout");
-                       assertTrue(logs.contains("job manager out"));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager 
out");
+               logs = getFromHTTP("http://localhost:"; + port + 
"/jobmanager/stdout");
+               assertTrue(logs.contains("job manager out"));
        }
 
        @Test
@@ -161,7 +164,6 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                        String config = getFromHTTP("http://localhost:"; + port 
+ "/jobmanager/config");
 
                        Map<String, String> conf = 
WebMonitorUtils.fromKeyValueJsonArray(config);
-                       
assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
                        assertEquals(
                                
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
                                
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
@@ -172,8 +174,11 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                }
        }
 
-       @Test(timeout = 15000)
+       @Test
        public void testStop() throws Exception {
+               // this only works if there is no active job at this point
+               assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+               
                // Create a task
                final JobVertex sender = new JobVertex("Sender");
                sender.setParallelism(2);
@@ -184,24 +189,34 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
 
                cluster.submitJobDetached(jobGraph);
 
+               // wait for job to show up
+               while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+                       Thread.sleep(10);
+               }
+               
                final FiniteDuration testTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
                final Deadline deadline = testTimeout.fromNow();
 
-               try (HttpTestClient client = new HttpTestClient("localhost", 
port)) {
-                       // Request the file from the web server
-                       client.sendDeleteRequest("/jobs/" + jid + "/stop", 
deadline.timeLeft());
-                       HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
-
-                       assertEquals(HttpResponseStatus.OK, 
response.getStatus());
-                       assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
-                       assertEquals("{}", response.getContent());
+               while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+                       try (HttpTestClient client = new 
HttpTestClient("localhost", port)) {
+                               // Request the file from the web server
+                               client.sendDeleteRequest("/jobs/" + jid + 
"/stop", deadline.timeLeft());
+                               HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
+       
+                               assertEquals(HttpResponseStatus.OK, 
response.getStatus());
+                               assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
+                               assertEquals("{}", response.getContent());
+                       }
+
+                       Thread.sleep(20);
                }
-
-               waitForTaskManagers();
        }
 
-       @Test(timeout = 15000)
+       @Test
        public void testStopYarn() throws Exception {
+               // this only works if there is no active job at this point
+               assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+               
                // Create a task
                final JobVertex sender = new JobVertex("Sender");
                sender.setParallelism(2);
@@ -212,34 +227,28 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
 
                cluster.submitJobDetached(jobGraph);
 
-               final FiniteDuration testTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
-               final Deadline deadline = testTimeout.fromNow();
-
-               try (HttpTestClient client = new HttpTestClient("localhost", 
port)) {
-                       // Request the file from the web server
-                       client.sendGetRequest("/jobs/" + jid + "/yarn-stop", 
deadline.timeLeft());
-
-                       HttpTestClient.SimpleHttpResponse response = client
-                                       .getNextResponse(deadline.timeLeft());
-
-                       assertEquals(HttpResponseStatus.OK, 
response.getStatus());
-                       assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
-                       assertEquals("{}", response.getContent());
+               // wait for job to show up
+               while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+                       Thread.sleep(10);
                }
 
-               waitForTaskManagers();
-       }
-
-       private void waitForTaskManagers() throws Exception {
-               int count = 0;
+               final FiniteDuration testTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+               final Deadline deadline = testTimeout.fromNow();
 
-               while (count != 4) {
-                       String json = getFromHTTP("http://localhost:"; + port + 
"/taskmanagers/");
-                       JSONObject parsed = new JSONObject(json);
-                       JSONArray taskManagers = 
parsed.getJSONArray("taskmanagers");
-                       JSONObject taskManager = taskManagers.getJSONObject(0);
-                       count = taskManager.getInt("freeSlots");
+               while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+                       try (HttpTestClient client = new 
HttpTestClient("localhost", port)) {
+                               // Request the file from the web server
+                               client.sendGetRequest("/jobs/" + jid + 
"/yarn-stop", deadline.timeLeft());
+       
+                               HttpTestClient.SimpleHttpResponse response = 
client
+                                               
.getNextResponse(deadline.timeLeft());
+       
+                               assertEquals(HttpResponseStatus.OK, 
response.getStatus());
+                               assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
+                               assertEquals("{}", response.getContent());
+                       }
+                       
+                       Thread.sleep(20);
                }
        }
-
 }

Reply via email to