This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new d540794 [FLINK-13892][hs] Harden HistoryServerTest d540794 is described below commit d5407947f23f7a6a9e8a8dbc7d2e78ea6257b7a8 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Aug 29 13:15:08 2019 +0200 [FLINK-13892][hs] Harden HistoryServerTest --- .../flink/runtime/webmonitor/history/HistoryServer.java | 6 +++--- .../webmonitor/history/HistoryServerArchiveFetcher.java | 12 ++++++------ .../flink/runtime/webmonitor/history/HistoryServerTest.java | 7 ++++--- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index a93fe93..f1a5330 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -140,9 +140,9 @@ public class HistoryServer { this(config, new CountDownLatch(0)); } - public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException { + public HistoryServer(Configuration config, CountDownLatch numArchivedJobs) throws IOException, FlinkException { Preconditions.checkNotNull(config); - Preconditions.checkNotNull(numFinishedPolls); + Preconditions.checkNotNull(numArchivedJobs); this.config = config; if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) { @@ -187,7 +187,7 @@ public class HistoryServer { } long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL); - archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls); + archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numArchivedJobs); this.shutdownHook = ShutdownHookUtil.addShutdownHook( HistoryServer.this::stop, diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index f95b14c..47888cd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -79,9 +79,9 @@ class HistoryServerArchiveFetcher { private final JobArchiveFetcherTask fetcherTask; private final long refreshIntervalMillis; - HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) { + HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) { this.refreshIntervalMillis = refreshIntervalMillis; - this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls); + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numArchivedJobs); if (LOG.isInfoEnabled()) { for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); @@ -112,7 +112,7 @@ class HistoryServerArchiveFetcher { static class JobArchiveFetcherTask extends TimerTask { private final List<HistoryServer.RefreshLocation> refreshDirs; - private final CountDownLatch numFinishedPolls; + private final CountDownLatch numArchivedJobs; /** Cache of all available jobs identified by their id. */ private final Set<String> cachedArchives; @@ -123,9 +123,9 @@ class HistoryServerArchiveFetcher { private static final String JSON_FILE_ENDING = ".json"; - JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) { + JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) { this.refreshDirs = checkNotNull(refreshDirs); - this.numFinishedPolls = numFinishedPolls; + this.numArchivedJobs = numArchivedJobs; this.cachedArchives = new HashSet<>(); this.webDir = checkNotNull(webDir); this.webJobDir = new File(webDir, "jobs"); @@ -200,6 +200,7 @@ class HistoryServerArchiveFetcher { } } updateOverview = true; + numArchivedJobs.countDown(); } catch (IOException e) { LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e); // Make sure we attempt to fetch the archive again @@ -228,7 +229,6 @@ class HistoryServerArchiveFetcher { } catch (Exception e) { LOG.error("Critical failure while fetching/processing job archives.", e); } - numFinishedPolls.countDown(); } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 03bcfa8..f88ae01 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -113,11 +113,12 @@ public class HistoryServerTest extends TestLogger { } createLegacyArchive(jmDirectory.toPath()); - CountDownLatch numFinishedPolls = new CountDownLatch(1); + CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs + 1); Configuration historyServerConfig = new Configuration(); historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath()); + historyServerConfig.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 100L); historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0); @@ -128,11 +129,11 @@ public class HistoryServerTest extends TestLogger { archives = jmDirectory.listFiles(); } - HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls); + HistoryServer hs = new HistoryServer(historyServerConfig, numExpectedArchivedJobs); try { hs.start(); String baseUrl = "http://localhost:" + hs.getWebPort(); - numFinishedPolls.await(10L, TimeUnit.SECONDS); + numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS); ObjectMapper mapper = new ObjectMapper(); String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);