This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc2d4192080e79f6c8a980e1fddd5367212654ca
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu May 28 15:37:26 2020 +0200

    [FLINK-18010][runtime] Expand HistoryServer logging
---
 .../webmonitor/history/HistoryServerArchiveFetcher.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index fd6069a..5a41173 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -186,11 +186,13 @@ class HistoryServerArchiveFetcher {
                @Override
                public void run() {
                        try {
+                               LOG.debug("Starting archive fetching.");
                                List<ArchiveEvent> events = new ArrayList<>();
                                Set<String> jobsToRemove = new 
HashSet<>(cachedArchives);
                                for (HistoryServer.RefreshLocation 
refreshLocation : refreshDirs) {
                                        Path refreshDir = 
refreshLocation.getPath();
                                        FileSystem refreshFS = 
refreshLocation.getFs();
+                                       LOG.debug("Checking archive directory 
{}.", refreshDir);
 
                                        // contents of /:refreshDir
                                        FileStatus[] jobArchives;
@@ -214,7 +216,11 @@ class HistoryServerArchiveFetcher {
                                                        continue;
                                                }
                                                jobsToRemove.remove(jobID);
-                                               if 
(!cachedArchives.contains(jobID)) {
+
+                                               if 
(cachedArchives.contains(jobID)) {
+                                                       LOG.trace("Ignoring 
archive {} because it was already fetched.", jobArchivePath);
+                                               } else {
+                                                       LOG.info("Processing 
archive {}.", jobArchivePath);
                                                        try {
                                                                for 
(ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
                                                                        String 
path = archive.getPath();
@@ -224,6 +230,7 @@ class HistoryServerArchiveFetcher {
                                                                        if 
(path.equals(JobsOverviewHeaders.URL)) {
                                                                                
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                                                        } else 
if (path.equals("/joboverview")) { // legacy path
+                                                                               
LOG.debug("Migrating legacy archive {}", jobArchivePath);
                                                                                
json = convertLegacyJobOverview(json);
                                                                                
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                                                        } else {
@@ -253,6 +260,7 @@ class HistoryServerArchiveFetcher {
                                                                }
                                                                events.add(new 
ArchiveEvent(jobID, ArchiveEventType.CREATED));
                                                                
cachedArchives.add(jobID);
+                                                               
LOG.info("Processing archive {} finished.", jobArchivePath);
                                                        } catch (IOException e) 
{
                                                                
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, 
e);
                                                                // Make sure we 
do not include this job in the overview
@@ -281,6 +289,7 @@ class HistoryServerArchiveFetcher {
                                        updateJobOverview(webOverviewDir, 
webDir);
                                }
                                events.forEach(jobArchiveEventListener::accept);
+                               LOG.debug("Finished archive fetching.");
                        } catch (Exception e) {
                                LOG.error("Critical failure while 
fetching/processing job archives.", e);
                        }

Reply via email to