Repository: flink Updated Branches: refs/heads/master 0f8d76c6f -> 4870fd62f
[tests] Fix unstable test WebFrontendITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acb051d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acb051d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acb051d8 Branch: refs/heads/master Commit: acb051d8db3acca0ff97628b602aa21dcbf3604f Parents: 0f8d76c Author: Stephan Ewen <[email protected]> Authored: Fri Mar 11 18:57:13 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Sun Mar 13 16:03:03 2016 +0100 ---------------------------------------------------------------------- .../flink/test/web/WebFrontendITCase.java | 171 ++++++++++--------- 1 file changed, 90 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/acb051d8/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 48888a4..31084a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -27,28 +27,26 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.testutils.StoppableInvokable; -import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.files.MimeTypes; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; -import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; -import org.apache.sling.commons.json.JSONArray; -import org.apache.sling.commons.json.JSONObject; + import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; + import io.netty.handler.codec.http.HttpResponseStatus; -import java.util.Arrays; -import java.util.Collection; +import java.io.File; +import java.nio.file.Files; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -57,30 +55,40 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -@RunWith(Parameterized.class) -public class WebFrontendITCase extends MultipleProgramsTestBase { +import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP; - // make sure that the webserver is started for us! - static { - startWebServer = true; - } +public class WebFrontendITCase { - private static int port = -1; + private static final int NUM_TASK_MANAGERS = 2; + private static final int NUM_SLOTS = 4; + + private static ForkableFlinkMiniCluster cluster; + private static int port = -1; + @BeforeClass - public static void initialize() { - WebMonitor webMonitor = cluster.webMonitor().get(); - port = webMonitor.getServerPort(); - } - - public WebFrontendITCase(TestExecutionMode m) { - super(m); - } - - @Parameterized.Parameters(name = "Execution mode = {0}") - public static Collection<Object[]> executionModes() { - return Arrays.<Object[]>asList( - new Object[] { TestExecutionMode.CLUSTER } ); + public static void initialize() throws Exception { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + + File logDir = File.createTempFile("TestBaseUtils-logdir", null); + assertTrue("Unable to delete temp file", logDir.delete()); + assertTrue("Unable to create temp directory", logDir.mkdir()); + File logFile = new File(logDir, "jobmanager.log"); + File outFile = new File(logDir, "jobmanager.out"); + + Files.createFile(logFile.toPath()); + Files.createFile(outFile.toPath()); + + config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath()); + + cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + + port = cluster.webMonitor().get().getServerPort(); } @Test @@ -128,7 +136,8 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { JsonNode taskManager = taskManagers.get(0); assertNotNull(taskManager); - assertEquals(4, taskManager.get("freeSlots").asInt()); + assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt()); + assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS); } catch(Exception e) { e.printStackTrace(); @@ -137,22 +146,16 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { } @Test - public void getLogAndStdoutFiles() { - try { - WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration()); + public void getLogAndStdoutFiles() throws Exception { + WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration()); - FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); - String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log"); - assertTrue(logs.contains("job manager log")); + FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); + String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log"); + assertTrue(logs.contains("job manager log")); - FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); - logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout"); - assertTrue(logs.contains("job manager out")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); + logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout"); + assertTrue(logs.contains("job manager out")); } @Test @@ -161,7 +164,6 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { String config = getFromHTTP("http://localhost:" + port + "/jobmanager/config"); Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config); - assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString())); assertEquals( cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)); @@ -172,8 +174,11 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { } } - @Test(timeout = 15000) + @Test public void testStop() throws Exception { + // this only works if there is no active job at this point + assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty()); + // Create a task final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(2); @@ -184,24 +189,34 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { cluster.submitJobDetached(jobGraph); + // wait for job to show up + while (cluster.getCurrentlyRunningJobsJava().isEmpty()) { + Thread.sleep(10); + } + final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); final Deadline deadline = testTimeout.fromNow(); - try (HttpTestClient client = new HttpTestClient("localhost", port)) { - // Request the file from the web server - client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); - HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); - - assertEquals(HttpResponseStatus.OK, response.getStatus()); - assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); - assertEquals("{}", response.getContent()); + while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { + try (HttpTestClient client = new HttpTestClient("localhost", port)) { + // Request the file from the web server + client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.OK, response.getStatus()); + assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); + assertEquals("{}", response.getContent()); + } + + Thread.sleep(20); } - - waitForTaskManagers(); } - @Test(timeout = 15000) + @Test public void testStopYarn() throws Exception { + // this only works if there is no active job at this point + assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty()); + // Create a task final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(2); @@ -212,34 +227,28 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { cluster.submitJobDetached(jobGraph); - final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); - final Deadline deadline = testTimeout.fromNow(); - - try (HttpTestClient client = new HttpTestClient("localhost", port)) { - // Request the file from the web server - client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); - - HttpTestClient.SimpleHttpResponse response = client - .getNextResponse(deadline.timeLeft()); - - assertEquals(HttpResponseStatus.OK, response.getStatus()); - assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); - assertEquals("{}", response.getContent()); + // wait for job to show up + while (cluster.getCurrentlyRunningJobsJava().isEmpty()) { + Thread.sleep(10); } - waitForTaskManagers(); - } - - private void waitForTaskManagers() throws Exception { - int count = 0; + final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); + final Deadline deadline = testTimeout.fromNow(); - while (count != 4) { - String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/"); - JSONObject parsed = new JSONObject(json); - JSONArray taskManagers = parsed.getJSONArray("taskmanagers"); - JSONObject taskManager = taskManagers.getJSONObject(0); - count = taskManager.getInt("freeSlots"); + while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { + try (HttpTestClient client = new HttpTestClient("localhost", port)) { + // Request the file from the web server + client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); + + HttpTestClient.SimpleHttpResponse response = client + .getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.OK, response.getStatus()); + assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); + assertEquals("{}", response.getContent()); + } + + Thread.sleep(20); } } - }
