[FLINK-9194][history] Rework and extend the HistoryServer test

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ba41bcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ba41bcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ba41bcd

Branch: refs/heads/release-1.5
Commit: 7ba41bcda2749388c9a6e99137b1c8476eeae0d1
Parents: 0d56793
Author: zentol <[email protected]>
Authored: Mon Apr 23 15:13:41 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 15 07:51:46 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServerTest.java   | 100 ++++++++++++-------
 1 file changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ba41bcd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a16f6fb..580d80f 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.ArchiveMessages;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.TestActorRef;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -50,43 +44,67 @@ import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
 /**
  * Tests for the HistoryServer.
  */
 public class HistoryServerTest extends TestLogger {
 
-       @Rule
-       public TemporaryFolder tmpDir = new TemporaryFolder();
-
-       @Test
-       public void testFullArchiveLifecycle() throws Exception {
-               ArchivedExecutionGraph graph = (ArchivedExecutionGraph) 
ArchivedJobGenerationUtils.getTestJob();
-
-               File jmDirectory = tmpDir.newFolder("jm");
-               File hsDirectory = tmpDir.newFolder("hs");
-
-               Configuration config = new Configuration();
-               config.setString(JobManagerOptions.ARCHIVE_DIR, 
jmDirectory.toURI().toString());
+       @ClassRule
+       public static final TemporaryFolder TMP = new TemporaryFolder();
+
+       private MiniClusterResource cluster;
+       private File jmDirectory;
+       private File hsDirectory;
+
+       @Before
+       public void setUp() throws Exception {
+               jmDirectory = TMP.newFolder("jm");
+               hsDirectory = TMP.newFolder("hs");
+
+               Configuration clusterConfig = new Configuration();
+               clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, 
jmDirectory.toURI().toString());
+
+               cluster = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               clusterConfig,
+                               1,
+                               1
+                       ),
+                       MiniClusterResource.MiniClusterType.NEW
+               );
+               cluster.before();
+       }
 
-               
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, 
jmDirectory.toURI().toString());
-               config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, 
hsDirectory.getAbsolutePath());
+       @After
+       public void tearDown() {
+               if (cluster != null) {
+                       cluster.after();
+               }
+       }
 
-               config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 
0);
+       @Test
+       public void testHistoryServerIntegration() throws Exception {
+               final int numJobs = 2;
+               for (int x = 0; x < numJobs; x++) {
+                       runJob();
+               }
 
-               ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(config);
-               Option<Path> archivePath = Option.apply(new 
Path(jmDirectory.toURI().toString()));
+               CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
-               ActorRef memoryArchivist = 
TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, 
archivePath), actorSystem);
-               memoryArchivist.tell(new 
ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+               Configuration historyServerConfig = new Configuration();
+               
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, 
jmDirectory.toURI().toString());
+               
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, 
hsDirectory.getAbsolutePath());
 
-               File archive = new File(jmDirectory, 
graph.getJobID().toString());
-               Assert.assertTrue(archive.exists());
+               
historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
-               CountDownLatch numFinishedPolls = new CountDownLatch(1);
+               // the job is archived asynchronously after env.execute() 
returns
+               File[] archives = jmDirectory.listFiles();
+               while (archives == null || archives.length != numJobs) {
+                       Thread.sleep(50);
+                       archives = jmDirectory.listFiles();
+               }
 
-               HistoryServer hs = new HistoryServer(config, numFinishedPolls);
+               HistoryServer hs = new HistoryServer(historyServerConfig, 
numFinishedPolls);
                try {
                        hs.start();
                        String baseUrl = "http://localhost:"; + hs.getWebPort();
@@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger {
                        String response = getFromHTTP(baseUrl + 
JobsOverviewHeaders.URL);
                        MultipleJobsDetails overview = 
mapper.readValue(response, MultipleJobsDetails.class);
 
-                       Assert.assertEquals(1, overview.getJobs().size());
+                       Assert.assertEquals(numJobs, overview.getJobs().size());
                } finally {
                        hs.stop();
                }
        }
 
+       private static void runJob() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(1, 2, 3)
+                       .print();
+
+               env.execute();
+       }
+
        public static String getFromHTTP(String url) throws Exception {
                URL u = new URL(url);
                HttpURLConnection connection = (HttpURLConnection) 
u.openConnection();

Reply via email to