This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc2d4192080e79f6c8a980e1fddd5367212654ca Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu May 28 15:37:26 2020 +0200 [FLINK-18010][runtime] Expand HistoryServer logging --- .../webmonitor/history/HistoryServerArchiveFetcher.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index fd6069a..5a41173 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -186,11 +186,13 @@ class HistoryServerArchiveFetcher { @Override public void run() { try { + LOG.debug("Starting archive fetching."); List<ArchiveEvent> events = new ArrayList<>(); Set<String> jobsToRemove = new HashSet<>(cachedArchives); for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { Path refreshDir = refreshLocation.getPath(); FileSystem refreshFS = refreshLocation.getFs(); + LOG.debug("Checking archive directory {}.", refreshDir); // contents of /:refreshDir FileStatus[] jobArchives; @@ -214,7 +216,11 @@ class HistoryServerArchiveFetcher { continue; } jobsToRemove.remove(jobID); - if (!cachedArchives.contains(jobID)) { + + if (cachedArchives.contains(jobID)) { + LOG.trace("Ignoring archive {} because it was already fetched.", jobArchivePath); + } else { + LOG.info("Processing archive {}.", jobArchivePath); try { for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) { String path = archive.getPath(); @@ -224,6 +230,7 @@ class HistoryServerArchiveFetcher { if (path.equals(JobsOverviewHeaders.URL)) { target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); } else if (path.equals("/joboverview")) { // legacy path + LOG.debug("Migrating legacy archive {}", jobArchivePath); json = convertLegacyJobOverview(json); target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); } else { @@ -253,6 +260,7 @@ class HistoryServerArchiveFetcher { } events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED)); cachedArchives.add(jobID); + LOG.info("Processing archive {} finished.", jobArchivePath); } catch (IOException e) { LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e); // Make sure we do not include this job in the overview @@ -281,6 +289,7 @@ class HistoryServerArchiveFetcher { updateJobOverview(webOverviewDir, webDir); } events.forEach(jobArchiveEventListener::accept); + LOG.debug("Finished archive fetching."); } catch (Exception e) { LOG.error("Critical failure while fetching/processing job archives.", e); }