[FLINK-9194][history] Rework and extend the HistoryServer test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ba41bcd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ba41bcd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ba41bcd Branch: refs/heads/release-1.5 Commit: 7ba41bcda2749388c9a6e99137b1c8476eeae0d1 Parents: 0d56793 Author: zentol <[email protected]> Authored: Mon Apr 23 15:13:41 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue May 15 07:51:46 2018 +0200 ---------------------------------------------------------------------- .../webmonitor/history/HistoryServerTest.java | 100 ++++++++++++------- 1 file changed, 63 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ba41bcd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index a16f6fb..580d80f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.messages.ArchiveMessages; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.TestActorRef; import org.apache.commons.io.IOUtils; +import org.junit.After; import org.junit.Assert; -import org.junit.Rule; +import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -50,43 +44,67 @@ import java.net.URL; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import scala.Option; - /** * Tests for the HistoryServer. */ public class HistoryServerTest extends TestLogger { - @Rule - public TemporaryFolder tmpDir = new TemporaryFolder(); - - @Test - public void testFullArchiveLifecycle() throws Exception { - ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob(); - - File jmDirectory = tmpDir.newFolder("jm"); - File hsDirectory = tmpDir.newFolder("hs"); - - Configuration config = new Configuration(); - config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + private MiniClusterResource cluster; + private File jmDirectory; + private File hsDirectory; + + @Before + public void setUp() throws Exception { + jmDirectory = TMP.newFolder("jm"); + hsDirectory = TMP.newFolder("hs"); + + Configuration clusterConfig = new Configuration(); + clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); + + cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + clusterConfig, + 1, + 1 + ), + MiniClusterResource.MiniClusterType.NEW + ); + cluster.before(); + } - config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); - config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath()); + @After + public void tearDown() { + if (cluster != null) { + cluster.after(); + } + } - config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0); + @Test + public void testHistoryServerIntegration() throws Exception { + final int numJobs = 2; + for (int x = 0; x < numJobs; x++) { + runJob(); + } - ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config); - Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString())); + CountDownLatch numFinishedPolls = new CountDownLatch(1); - ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem); - memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null); + Configuration historyServerConfig = new Configuration(); + historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); + historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath()); - File archive = new File(jmDirectory, graph.getJobID().toString()); - Assert.assertTrue(archive.exists()); + historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0); - CountDownLatch numFinishedPolls = new CountDownLatch(1); + // the job is archived asynchronously after env.execute() returns + File[] archives = jmDirectory.listFiles(); + while (archives == null || archives.length != numJobs) { + Thread.sleep(50); + archives = jmDirectory.listFiles(); + } - HistoryServer hs = new HistoryServer(config, numFinishedPolls); + HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls); try { hs.start(); String baseUrl = "http://localhost:" + hs.getWebPort(); @@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger { String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL); MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class); - Assert.assertEquals(1, overview.getJobs().size()); + Assert.assertEquals(numJobs, overview.getJobs().size()); } finally { hs.stop(); } } + private static void runJob() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2, 3) + .print(); + + env.execute(); + } + public static String getFromHTTP(String url) throws Exception { URL u = new URL(url); HttpURLConnection connection = (HttpURLConnection) u.openConnection();
