This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e3adbdce5a0 [FLINK-38777][history] HistoryServer supports application 
archives
e3adbdce5a0 is described below

commit e3adbdce5a02a67656b49f765ddc328846d22082
Author: Yi Zhang <[email protected]>
AuthorDate: Fri Jan 16 16:34:56 2026 +0800

    [FLINK-38777][history] HistoryServer supports application archives
---
 .../generated/history_server_configuration.html    |  18 +-
 .../flink/configuration/HistoryServerOptions.java  |  93 +++-
 .../runtime/webmonitor/history/HistoryServer.java  |  41 +-
 .../HistoryServerApplicationArchiveFetcher.java    | 273 ++++++++++++
 .../history/HistoryServerArchiveFetcher.java       | 274 ++++++------
 ...dStrategy.java => ArchiveRetainedStrategy.java} |   2 +-
 ....java => CompositeArchiveRetainedStrategy.java} |  48 +-
 .../webmonitor/history/HistoryServerTest.java      | 488 ++++++++++++++++++++-
 ...a => CompositeArchiveRetainedStrategyTest.java} |  94 +++-
 9 files changed, 1113 insertions(+), 218 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/history_server_configuration.html 
b/docs/layouts/shortcodes/generated/history_server_configuration.html
index 8b171ba627c..db696545a18 100644
--- a/docs/layouts/shortcodes/generated/history_server_configuration.html
+++ b/docs/layouts/shortcodes/generated/history_server_configuration.html
@@ -8,11 +8,17 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>historyserver.archive.clean-expired-applications</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether HistoryServer should cleanup applications that are no 
longer present in the archive directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. </td>
+        </tr>
         <tr>
             <td><h5>historyserver.archive.clean-expired-jobs</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Whether HistoryServer should cleanup jobs that are no longer 
present `historyserver.archive.fs.dir`.</td>
+            <td>Whether HistoryServer should cleanup jobs that are no longer 
present in the archive directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <br />Note: This 
option applies only to legacy job archives created before the introduction of 
application archiving (FLINK-38761).</td>
         </tr>
         <tr>
             <td><h5>historyserver.archive.fs.dir</h5></td>
@@ -26,17 +32,23 @@
             <td>Duration</td>
             <td>Interval for refreshing the archived job directories.</td>
         </tr>
+        <tr>
+            <td><h5>historyserver.archive.retained-applications</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Integer</td>
+            <td>The maximum number of applications to retain in each archive 
directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. This option 
works together with the TTL (see <code 
class="highlighter-rouge">historyserver.archive.retained-ttl</code>). Archived 
entities will be removed if their TTL has expired or the retention count limit 
has been reached. <br />If set to <code class="highlighter-rouge">-1</code> 
(default), there is no limit to the numb [...]
+        </tr>
         <tr>
             <td><h5>historyserver.archive.retained-jobs</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>The maximum number of jobs to retain in each archive directory 
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>. 
<ul><li>If the option is not specified as a positive number without specifying 
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all 
of the jobs archives will be retained. </li><li>If the option is specified as a 
positive number without specifying a value of <code 
class="highlighter-rouge">historyserver.arc [...]
+            <td>The maximum number of jobs to retain in each archive directory 
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>. 
<ul><li>If the option is not specified as a positive number without specifying 
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all 
of the jobs archives will be retained. </li><li>If the option is specified as a 
positive number without specifying a value of <code 
class="highlighter-rouge">historyserver.arc [...]
         </tr>
         <tr>
             <td><h5>historyserver.archive.retained-ttl</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
-            <td>The time-to-live duration to retain the jobs archived in each 
archive directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the 
option is not specified without specifying <code 
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, all of 
the jobs archives will be retained. </li><li>If the option is specified without 
specifying <code 
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, the jobs 
[...]
+            <td>The time-to-live duration to retain the archived entities 
(jobs and applications) in each archive directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. This option 
works together with the retention count limits (see <code 
class="highlighter-rouge">historyserver.archive.retained-applications</code> 
and <code 
class="highlighter-rouge">historyserver.archive.retained-jobs</code>). Archived 
entities will be removed if their TTL has expired o [...]
         </tr>
         <tr>
             <td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 1628d266f35..1677dc2e1a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -51,17 +51,6 @@ public class HistoryServerOptions {
                                     + " monitor these directories for archived 
jobs. You can configure the JobManager to archive jobs to a"
                                     + " directory via 
`jobmanager.archive.fs.dir`.");
 
-    /** If this option is enabled then deleted job archives are also deleted 
from HistoryServer. */
-    public static final ConfigOption<Boolean> 
HISTORY_SERVER_CLEANUP_EXPIRED_JOBS =
-            key("historyserver.archive.clean-expired-jobs")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            String.format(
-                                    "Whether HistoryServer should cleanup jobs"
-                                            + " that are no longer present 
`%s`.",
-                                    HISTORY_SERVER_ARCHIVE_DIRS.key()));
-
     /**
      * Pattern of the log URL of TaskManager. The HistoryServer will generate 
actual URLs from it.
      */
@@ -137,6 +126,24 @@ public class HistoryServerOptions {
             "Specify the option in only one HistoryServer instance to avoid 
errors caused by multiple instances simultaneously cleaning up remote files, ";
     private static final String CONFIGURE_CONSISTENT =
             "Or you can keep the value of this configuration consistent across 
them. ";
+    private static final String LEGACY_NOTE_MESSAGE =
+            "Note: This option applies only to legacy job archives created 
before the introduction of application archiving (FLINK-38761).";
+    private static final String RETAINED_STRATEGY_MESSAGE =
+            "Archived entities will be removed if their TTL has expired or the 
retention count limit has been reached. ";
+
+    /** If this option is enabled then deleted job archives are also deleted 
from HistoryServer. */
+    public static final ConfigOption<Boolean> 
HISTORY_SERVER_CLEANUP_EXPIRED_JOBS =
+            key("historyserver.archive.clean-expired-jobs")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Whether HistoryServer should 
cleanup jobs that are no longer present in the archive directory defined by %s. 
",
+                                            
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+                                    .linebreak()
+                                    .text(LEGACY_NOTE_MESSAGE)
+                                    .build());
 
     public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
             key(HISTORY_SERVER_RETAINED_JOBS_KEY)
@@ -164,6 +171,52 @@ public class HistoryServerOptions {
                                             
code("IllegalConfigurationException"))
                                     .linebreak()
                                     .text(NOTE_MESSAGE)
+                                    .list(
+                                            text(CONFIGURE_SINGLE_INSTANCE),
+                                            text(CONFIGURE_CONSISTENT))
+                                    .linebreak()
+                                    .text(LEGACY_NOTE_MESSAGE)
+                                    .build());
+
+    /**
+     * If this option is enabled then deleted application archives are also 
deleted from
+     * HistoryServer.
+     */
+    public static final ConfigOption<Boolean> 
HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS =
+            key("historyserver.archive.clean-expired-applications")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Whether HistoryServer should 
cleanup applications that are no longer present in the archive directory 
defined by %s. ",
+                                            
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+                                    .build());
+
+    public static final ConfigOption<Integer> 
HISTORY_SERVER_RETAINED_APPLICATIONS =
+            key("historyserver.archive.retained-applications")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The maximum number of 
applications to retain in each archive directory defined by %s. ",
+                                            
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+                                    .text(
+                                            "This option works together with 
the TTL (see %s). ",
+                                            
code(HISTORY_SERVER_RETAINED_TTL_KEY))
+                                    .text(RETAINED_STRATEGY_MESSAGE)
+                                    .linebreak()
+                                    .text(
+                                            "If set to %s (default), there is 
no limit to the number of archives. ",
+                                            code("-1"))
+                                    .text(
+                                            "If set to %s or less than %s, 
HistoryServer will throw an %s. ",
+                                            code("0"),
+                                            code("-1"),
+                                            
code("IllegalConfigurationException"))
+                                    .linebreak()
+                                    .text(NOTE_MESSAGE)
                                     .list(
                                             text(CONFIGURE_SINGLE_INSTANCE),
                                             text(CONFIGURE_CONSISTENT))
@@ -176,18 +229,14 @@ public class HistoryServerOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The time-to-live duration to 
retain the jobs archived in each archive directory defined by %s. ",
+                                            "The time-to-live duration to 
retain the archived entities (jobs and applications) in each archive directory 
defined by %s. ",
                                             
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
-                                    .list(
-                                            text(
-                                                    "If the option is not 
specified without specifying %s, all of the jobs archives will be retained. ",
-                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
-                                            text(
-                                                    "If the option is 
specified without specifying %s, the jobs archive whose modification time in 
the time-to-live duration will be retained. ",
-                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
-                                            text(
-                                                    "If this option is 
specified as a positive time duration together with the %s option, the job 
archive will be removed if its TTL has expired or the retained job count has 
been reached. ",
-                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)))
+                                    .text(
+                                            "This option works together with 
the retention count limits (see %s and %s). ",
+                                            
code(HISTORY_SERVER_RETAINED_APPLICATIONS.key()),
+                                            
code(HISTORY_SERVER_RETAINED_JOBS_KEY))
+                                    .text(RETAINED_STRATEGY_MESSAGE)
+                                    .linebreak()
                                     .text(
                                             "If set to equal to or less than 
%s milliseconds, HistoryServer will throw an %s. ",
                                             code("0"), 
code("IllegalConfigurationException"))
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 69d14e7fecb..95648156093 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Runnables;
-import 
org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
+import 
org.apache.flink.runtime.webmonitor.history.retaining.CompositeArchiveRetainedStrategy;
 import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.ExceptionUtils;
@@ -93,6 +93,8 @@ import java.util.function.Consumer;
  *   <li>/config
  *   <li>/joboverview
  *   <li>/jobs/:jobid/*
+ *   <li>/applications/overview
+ *   <li>/applications/:applicationid/*
  * </ul>
  *
  * <p>and relies on static files that are served by the {@link
@@ -110,8 +112,18 @@ public class HistoryServer {
     private final long webRefreshIntervalMillis;
     private final File webDir;
 
+    /**
+     * The archive fetcher is responsible for fetching job archives that are 
not part of an
+     * application (legacy jobs created before application archiving was 
introduced in FLINK-38761).
+     */
     private final HistoryServerArchiveFetcher archiveFetcher;
 
+    /**
+     * The archive fetcher is responsible for fetching application archives 
and their associated job
+     * archives.
+     */
+    private final HistoryServerApplicationArchiveFetcher 
applicationArchiveFetcher;
+
     @Nullable private final SSLHandlerFactory serverSSLFactory;
     private WebFrontendBootstrap netty;
 
@@ -161,7 +173,7 @@ public class HistoryServer {
     }
 
     public HistoryServer(Configuration config) throws IOException, 
FlinkException {
-        this(config, (event) -> {});
+        this(config, (event) -> {}, (event) -> {});
     }
 
     /**
@@ -175,7 +187,9 @@ public class HistoryServer {
      */
     public HistoryServer(
             Configuration config,
-            Consumer<HistoryServerArchiveFetcher.ArchiveEvent> 
jobArchiveEventListener)
+            Consumer<HistoryServerArchiveFetcher.ArchiveEvent> 
jobArchiveEventListener,
+            Consumer<HistoryServerApplicationArchiveFetcher.ArchiveEvent>
+                    applicationArchiveEventListener)
             throws IOException, FlinkException {
         Preconditions.checkNotNull(config);
         Preconditions.checkNotNull(jobArchiveEventListener);
@@ -199,8 +213,10 @@ public class HistoryServer {
 
         webDir = clearWebDir(config);
 
-        boolean cleanupExpiredArchives =
+        boolean cleanupExpiredJobs =
                 
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS);
+        boolean cleanupExpiredApplications =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS);
 
         String refreshDirectories = 
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
         if (refreshDirectories == null) {
@@ -235,8 +251,15 @@ public class HistoryServer {
                         refreshDirs,
                         webDir,
                         jobArchiveEventListener,
-                        cleanupExpiredArchives,
-                        CompositeJobRetainedStrategy.createFrom(config));
+                        cleanupExpiredJobs,
+                        
CompositeArchiveRetainedStrategy.createForJobFromConfig(config));
+        applicationArchiveFetcher =
+                new HistoryServerApplicationArchiveFetcher(
+                        refreshDirs,
+                        webDir,
+                        applicationArchiveEventListener,
+                        cleanupExpiredApplications,
+                        
CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config));
 
         this.shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
@@ -339,7 +362,11 @@ public class HistoryServer {
 
     private Runnable getArchiveFetchingRunnable() {
         return Runnables.withUncaughtExceptionHandler(
-                () -> archiveFetcher.fetchArchives(), 
FatalExitExceptionHandler.INSTANCE);
+                () -> {
+                    archiveFetcher.fetchArchives();
+                    applicationArchiveFetcher.fetchArchives();
+                },
+                FatalExitExceptionHandler.INSTANCE);
     }
 
     void stop() {
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
new file mode 100644
index 00000000000..310e91b7753
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.ArchivePathUtils;
+import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import 
org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
+import 
org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
+import org.apache.flink.util.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the application 
and job archives that
+ * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. 
The directories are
+ * polled in regular intervals, defined by {@link
+ * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ *
+ * <p>The archives are downloaded and expanded into a file structure analog to 
the REST API.
+ *
+ * <p>Removes existing archives from these directories and the cache according 
to {@link
+ * ArchiveRetainedStrategy} and {@link
+ * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}.
+ */
+public class HistoryServerApplicationArchiveFetcher extends 
HistoryServerArchiveFetcher {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class);
+
+    private static final String APPLICATIONS_SUBDIR = "applications";
+    private static final String APPLICATION_OVERVIEWS_SUBDIR = 
"application-overviews";
+
+    private final Map<Path, Map<String, Set<String>>> 
cachedApplicationIdsToJobIds =
+            new HashMap<>();
+
+    private final File webApplicationDir;
+    private final File webApplicationsOverviewDir;
+
+    HistoryServerApplicationArchiveFetcher(
+            List<HistoryServer.RefreshLocation> refreshDirs,
+            File webDir,
+            Consumer<HistoryServerApplicationArchiveFetcher.ArchiveEvent> 
archiveEventListener,
+            boolean cleanupExpiredArchives,
+            ArchiveRetainedStrategy retainedStrategy)
+            throws IOException {
+        super(refreshDirs, webDir, archiveEventListener, 
cleanupExpiredArchives, retainedStrategy);
+
+        for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
+            cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new 
HashMap<>());
+        }
+        this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR);
+        Files.createDirectories(webApplicationDir.toPath());
+        this.webApplicationsOverviewDir = new File(webDir, 
APPLICATION_OVERVIEWS_SUBDIR);
+        Files.createDirectories(webApplicationsOverviewDir.toPath());
+        updateApplicationOverview();
+    }
+
+    @Override
+    List<FileStatus> listValidArchives(FileSystem refreshFS, Path refreshDir) 
throws IOException {
+        List<FileStatus> applicationArchiveDirs = new ArrayList<>();
+        FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir);
+        if (clusterDirs == null) {
+            // the entire refreshDirectory was removed
+            return applicationArchiveDirs;
+        }
+
+        // Check for application archive directories in the cluster 
directories and named according
+        // to the application ID format
+        for (FileStatus clusterDir : clusterDirs) {
+            if (clusterDir.isDir() && 
isValidId(clusterDir.getPath().getName(), refreshDir)) {
+                Path applicationsDir =
+                        new Path(clusterDir.getPath(), 
ArchivePathUtils.APPLICATIONS_DIR);
+                FileStatus[] applicationDirs = 
refreshFS.listStatus(applicationsDir);
+                if (applicationDirs == null) {
+                    // the entire applicationsDirectory was removed
+                    return applicationArchiveDirs;
+                }
+
+                for (FileStatus applicationDir : applicationDirs) {
+                    if (applicationDir.isDir()
+                            && isValidId(applicationDir.getPath().getName(), 
refreshDir)) {
+                        applicationArchiveDirs.add(applicationDir);
+                    }
+                }
+            }
+        }
+
+        return applicationArchiveDirs;
+    }
+
+    private boolean isValidId(String id, Path refreshDir) {
+        try {
+            ApplicationID.fromHexString(id);
+            return true;
+        } catch (IllegalArgumentException iae) {
+            LOG.debug(
+                    "Archive directory {} contained file with unexpected name 
{}. Ignoring file.",
+                    refreshDir,
+                    id,
+                    iae);
+            return false;
+        }
+    }
+
+    @Override
+    List<ArchiveEvent> processArchive(String archiveId, Path archivePath, Path 
refreshDir)
+            throws IOException {
+        FileSystem fs = archivePath.getFileSystem();
+        Path applicationArchive = new Path(archivePath, 
ArchivePathUtils.APPLICATION_ARCHIVE_NAME);
+        if (!fs.exists(applicationArchive)) {
+            throw new IOException("Application archive " + applicationArchive 
+ " does not exist.");
+        }
+
+        List<ArchiveEvent> events = new ArrayList<>();
+        events.add(processApplicationArchive(archiveId, applicationArchive));
+
+        Path jobArchivesDir = new Path(archivePath, ArchivePathUtils.JOBS_DIR);
+
+        List<FileStatus> jobArchives = listValidJobArchives(fs, 
jobArchivesDir);
+        for (FileStatus jobArchive : jobArchives) {
+            String jobId = jobArchive.getPath().getName();
+            cachedApplicationIdsToJobIds
+                    .get(refreshDir)
+                    .computeIfAbsent(archiveId, k -> new HashSet<>())
+                    .add(jobId);
+            events.add(processJobArchive(jobId, jobArchive.getPath()));
+        }
+
+        return events;
+    }
+
+    private ArchiveEvent processApplicationArchive(String applicationId, Path 
applicationArchive)
+            throws IOException {
+        for (ArchivedJson archive : 
FsJsonArchivist.readArchivedJsons(applicationArchive)) {
+            String path = archive.getPath();
+            String json = archive.getJson();
+
+            File target;
+            if (path.equals(ApplicationsOverviewHeaders.URL)) {
+                target = new File(webApplicationsOverviewDir, applicationId + 
JSON_FILE_ENDING);
+            } else {
+                // this implicitly writes into webApplicationDir
+                target = new File(webDir, path + JSON_FILE_ENDING);
+            }
+
+            writeTargetFile(target, json);
+        }
+
+        return new ArchiveEvent(applicationId, ArchiveEventType.CREATED);
+    }
+
+    @Override
+    void deleteFromRemote(Path archive) throws IOException {
+        // delete application archive directory recursively (including all its 
job archives)
+        archive.getFileSystem().delete(archive, true);
+    }
+
+    @Override
+    List<ArchiveEvent> deleteCachedArchives(String archiveId, Path refreshDir) 
{
+        LOG.info("Archive directories for application {} is deleted", 
archiveId);
+        List<ArchiveEvent> deleteLog = new ArrayList<>();
+
+        deleteLog.add(deleteApplicationFiles(archiveId));
+
+        Set<String> jobIds = 
cachedApplicationIdsToJobIds.get(refreshDir).remove(archiveId);
+        if (jobIds != null) {
+            jobIds.forEach(jobId -> deleteLog.add(deleteJobFiles(jobId)));
+        }
+
+        return deleteLog;
+    }
+
+    private ArchiveEvent deleteApplicationFiles(String applicationId) {
+        // Make sure we do not include this application in the overview
+        try {
+            Files.deleteIfExists(
+                    new File(webApplicationsOverviewDir, applicationId + 
JSON_FILE_ENDING)
+                            .toPath());
+        } catch (IOException ioe) {
+            LOG.warn("Could not delete file from overview directory.", ioe);
+        }
+
+        // Clean up application files we may have created
+        File applicationDirectory = new File(webApplicationDir, applicationId);
+        try {
+            FileUtils.deleteDirectory(applicationDirectory);
+        } catch (IOException ioe) {
+            LOG.warn("Could not clean up application directory.", ioe);
+        }
+
+        try {
+            Files.deleteIfExists(
+                    new File(webApplicationDir, applicationId + 
JSON_FILE_ENDING).toPath());
+        } catch (IOException ioe) {
+            LOG.warn("Could not delete file from application directory.", ioe);
+        }
+
+        return new ArchiveEvent(applicationId, ArchiveEventType.DELETED);
+    }
+
+    @Override
+    void updateOverview() {
+        updateApplicationOverview();
+        updateJobOverview();
+    }
+
+    /**
+     * This method replicates the JSON response that would be given by the
+     * ApplicationsOverviewHandler when listing applications.
+     *
+     * <p>Every application archive contains an overview entry with the same 
structure. Since
+     * applications are archived on their own however the list of applications 
only contains a
+     * single application.
+     *
+     * <p>For the display in the HistoryServer WebFrontend we have to combine 
these overviews.
+     */
+    private void updateApplicationOverview() {
+        try (JsonGenerator gen =
+                jacksonFactory.createGenerator(
+                        HistoryServer.createOrGetFile(webDir, 
ApplicationsOverviewHeaders.URL))) {
+            File[] overviews = new 
File(webApplicationsOverviewDir.getPath()).listFiles();
+            if (overviews != null) {
+                Collection<ApplicationDetails> allApplications = new 
ArrayList<>(overviews.length);
+                for (File overview : overviews) {
+                    MultipleApplicationsDetails subApplications =
+                            mapper.readValue(overview, 
MultipleApplicationsDetails.class);
+                    allApplications.addAll(subApplications.getApplications());
+                }
+                mapper.writeValue(gen, new 
MultipleApplicationsDetails(allApplications));
+            }
+        } catch (IOException ioe) {
+            LOG.error("Failed to update application overview.", ioe);
+        }
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 4fe8bd58d5b..59eb258b3ed 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.history.FsJsonArchivist;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import 
org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
+import 
org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
@@ -48,7 +48,6 @@ import java.io.StringWriter;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -68,34 +67,31 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The archives are downloaded and expanded into a file structure analog to 
the REST API.
  *
- * <p>Removes existing archives from these directories and the cache if 
configured by {@link
- * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS} or {@link
- * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS}.
+ * <p>Removes existing archives from these directories and the cache according 
to {@link
+ * ArchiveRetainedStrategy} and {@link 
HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS}.
  */
 class HistoryServerArchiveFetcher {
 
-    /** Possible job archive operations in history-server. */
+    /** Possible archive operations in history-server. */
     public enum ArchiveEventType {
-        /** Job archive was found in one refresh location and created in 
history server. */
+        /** Archive was found in one refresh location and created in history 
server. */
         CREATED,
-        /**
-         * Job archive was deleted from one of refresh locations and deleted 
from history server.
-         */
+        /** Archive was deleted from one of refresh locations and deleted from 
history server. */
         DELETED
     }
 
-    /** Representation of job archive event. */
+    /** Representation of archive event. */
     public static class ArchiveEvent {
-        private final String jobID;
+        private final String id;
         private final ArchiveEventType operation;
 
-        ArchiveEvent(String jobID, ArchiveEventType operation) {
-            this.jobID = jobID;
+        ArchiveEvent(String id, ArchiveEventType operation) {
+            this.id = id;
             this.operation = operation;
         }
 
-        public String getJobID() {
-            return jobID;
+        public String getId() {
+            return id;
         }
 
         public ArchiveEventType getType() {
@@ -105,48 +101,50 @@ class HistoryServerArchiveFetcher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
 
-    private static final JsonFactory jacksonFactory = new JsonFactory();
-    private static final ObjectMapper mapper = 
JacksonMapperFactory.createObjectMapper();
+    protected static final String JSON_FILE_ENDING = ".json";
+    protected static final String JOBS_SUBDIR = "jobs";
+    protected static final String JOB_OVERVIEWS_SUBDIR = "overviews";
 
-    private static final String JSON_FILE_ENDING = ".json";
+    protected final JsonFactory jacksonFactory = new JsonFactory();
+    protected final ObjectMapper mapper = 
JacksonMapperFactory.createObjectMapper();
 
-    private final List<HistoryServer.RefreshLocation> refreshDirs;
-    private final Consumer<ArchiveEvent> jobArchiveEventListener;
-    private final boolean processExpiredArchiveDeletion;
-    private final JobRetainedStrategy jobRetainedStrategy;
+    protected final List<HistoryServer.RefreshLocation> refreshDirs;
+    protected final Consumer<ArchiveEvent> archiveEventListener;
+    protected final boolean processExpiredArchiveDeletion;
+    protected final ArchiveRetainedStrategy retainedStrategy;
 
-    /** Cache of all available jobs identified by their id. */
-    private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
+    /** Cache of all available archives identified by their id. */
+    protected final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
 
-    private final File webDir;
-    private final File webJobDir;
-    private final File webOverviewDir;
+    protected final File webDir;
+    protected final File webJobDir;
+    protected final File webOverviewDir;
 
     HistoryServerArchiveFetcher(
             List<HistoryServer.RefreshLocation> refreshDirs,
             File webDir,
-            Consumer<ArchiveEvent> jobArchiveEventListener,
+            Consumer<ArchiveEvent> archiveEventListener,
             boolean cleanupExpiredArchives,
-            JobRetainedStrategy jobRetainedStrategy)
+            ArchiveRetainedStrategy retainedStrategy)
             throws IOException {
         this.refreshDirs = checkNotNull(refreshDirs);
-        this.jobArchiveEventListener = jobArchiveEventListener;
+        this.archiveEventListener = archiveEventListener;
         this.processExpiredArchiveDeletion = cleanupExpiredArchives;
-        this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
+        this.retainedStrategy = checkNotNull(retainedStrategy);
         this.cachedArchivesPerRefreshDirectory = new HashMap<>();
         for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
             cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new 
HashSet<>());
         }
         this.webDir = checkNotNull(webDir);
-        this.webJobDir = new File(webDir, "jobs");
+        this.webJobDir = new File(webDir, JOBS_SUBDIR);
         Files.createDirectories(webJobDir.toPath());
-        this.webOverviewDir = new File(webDir, "overviews");
+        this.webOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR);
         Files.createDirectories(webOverviewDir.toPath());
-        updateJobOverview(webOverviewDir, webDir);
+        updateJobOverview();
 
         if (LOG.isInfoEnabled()) {
             for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
-                LOG.info("Monitoring directory {} for archived jobs.", 
refreshDir.getPath());
+                LOG.info("Monitoring directory {} for archives.", 
refreshDir.getPath());
             }
         }
     }
@@ -155,100 +153,102 @@ class HistoryServerArchiveFetcher {
         try {
             LOG.debug("Starting archive fetching.");
             List<ArchiveEvent> events = new ArrayList<>();
-            Map<Path, Set<String>> jobsToRemove = new HashMap<>();
+            Map<Path, Set<String>> archivesToRemove = new HashMap<>();
             cachedArchivesPerRefreshDirectory.forEach(
-                    (path, archives) -> jobsToRemove.put(path, new 
HashSet<>(archives)));
+                    (path, archives) -> archivesToRemove.put(path, new 
HashSet<>(archives)));
             Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
             for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
                 Path refreshDir = refreshLocation.getPath();
                 LOG.debug("Checking archive directory {}.", refreshDir);
 
-                // contents of /:refreshDir
-                FileStatus[] jobArchives;
+                List<FileStatus> archives;
                 try {
-                    jobArchives = listArchives(refreshLocation.getFs(), 
refreshDir);
+                    archives = listValidArchives(refreshLocation.getFs(), 
refreshDir);
+                    archives.sort(
+                            
Comparator.comparingLong(FileStatus::getModificationTime).reversed());
                 } catch (IOException e) {
-                    LOG.error("Failed to access job archive location for path 
{}.", refreshDir, e);
+                    LOG.error("Failed to access archive location for path 
{}.", refreshDir, e);
                     // something went wrong, potentially due to a concurrent 
deletion
-                    // do not remove any jobs now; we will retry later
-                    jobsToRemove.remove(refreshDir);
+                    // do not remove any archives now; we will retry later
+                    archivesToRemove.remove(refreshDir);
                     continue;
                 }
 
                 int fileOrderedIndexOnModifiedTime = 0;
-                for (FileStatus jobArchive : jobArchives) {
-                    Path jobArchivePath = jobArchive.getPath();
-                    String jobID = jobArchivePath.getName();
-                    if (!isValidJobID(jobID, refreshDir)) {
-                        continue;
-                    }
-
-                    jobsToRemove.get(refreshDir).remove(jobID);
+                for (FileStatus archive : archives) {
+                    Path archivePath = archive.getPath();
+                    String archiveId = archivePath.getName();
+                    archivesToRemove.get(refreshDir).remove(archiveId);
 
                     fileOrderedIndexOnModifiedTime++;
-                    if (!jobRetainedStrategy.shouldRetain(
-                            jobArchive, fileOrderedIndexOnModifiedTime)) {
+                    if (!retainedStrategy.shouldRetain(archive, 
fileOrderedIndexOnModifiedTime)) {
                         archivesBeyondRetainedLimit
                                 .computeIfAbsent(refreshDir, ignored -> new 
HashSet<>())
-                                .add(jobArchivePath);
+                                .add(archivePath);
                         continue;
                     }
 
-                    if 
(cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) {
+                    if 
(cachedArchivesPerRefreshDirectory.get(refreshDir).contains(archiveId)) {
                         LOG.trace(
-                                "Ignoring archive {} because it was already 
fetched.",
-                                jobArchivePath);
+                                "Ignoring archive {} because it was already 
fetched.", archivePath);
                     } else {
-                        LOG.info("Processing archive {}.", jobArchivePath);
+                        LOG.info("Processing archive {}.", archivePath);
                         try {
-                            processArchive(jobID, jobArchivePath);
-                            events.add(new ArchiveEvent(jobID, 
ArchiveEventType.CREATED));
-                            
cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID);
-                            LOG.info("Processing archive {} finished.", 
jobArchivePath);
+                            events.addAll(processArchive(archiveId, 
archivePath, refreshDir));
+                            
cachedArchivesPerRefreshDirectory.get(refreshDir).add(archiveId);
+                            LOG.info("Processing archive {} finished.", 
archivePath);
                         } catch (IOException e) {
                             LOG.error(
-                                    "Failure while fetching/processing job 
archive for job {}.",
-                                    jobID,
-                                    e);
-                            deleteJobFiles(jobID);
+                                    "Failure while fetching/processing archive 
{}.", archiveId, e);
+                            deleteCachedArchives(archiveId, refreshDir);
                         }
                     }
                 }
             }
 
-            if 
(jobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
+            if 
(archivesToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
                     && processExpiredArchiveDeletion) {
-                events.addAll(cleanupExpiredJobs(jobsToRemove));
+                events.addAll(cleanupExpiredArchives(archivesToRemove));
             }
             if (!archivesBeyondRetainedLimit.isEmpty()) {
-                
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
+                
events.addAll(cleanupArchivesBeyondRetainedLimit(archivesBeyondRetainedLimit));
             }
             if (!events.isEmpty()) {
-                updateJobOverview(webOverviewDir, webDir);
+                updateOverview();
             }
-            events.forEach(jobArchiveEventListener::accept);
+            events.forEach(archiveEventListener);
             LOG.debug("Finished archive fetching.");
         } catch (Exception e) {
-            LOG.error("Critical failure while fetching/processing job 
archives.", e);
+            LOG.error("Critical failure while fetching/processing archives.", 
e);
         }
     }
 
-    private static FileStatus[] listArchives(FileSystem refreshFS, Path 
refreshDir)
+    List<FileStatus> listValidArchives(FileSystem refreshFS, Path refreshDir) 
throws IOException {
+        return listValidJobArchives(refreshFS, refreshDir);
+    }
+
+    List<FileStatus> listValidJobArchives(FileSystem refreshFS, Path 
refreshDir)
             throws IOException {
+        List<FileStatus> jobArchives = new ArrayList<>();
         // contents of /:refreshDir
-        FileStatus[] jobArchives = refreshFS.listStatus(refreshDir);
-        if (jobArchives == null) {
+        FileStatus[] archives = refreshFS.listStatus(refreshDir);
+        if (archives == null) {
             // the entire refreshDirectory was removed
-            return new FileStatus[0];
+            return jobArchives;
         }
 
-        Arrays.sort(
-                jobArchives, 
Comparator.comparingLong(FileStatus::getModificationTime).reversed());
+        // Check for job archive files located directly in the refresh 
directory and named according
+        // to the job ID format
+        for (FileStatus archive : archives) {
+            if (!archive.isDir() && isValidJobId(archive.getPath().getName(), 
refreshDir)) {
+                jobArchives.add(archive);
+            }
+        }
 
         return jobArchives;
     }
 
-    private static boolean isValidJobID(String jobId, Path refreshDir) {
+    boolean isValidJobId(String jobId, Path refreshDir) {
         try {
             JobID.fromHexString(jobId);
             return true;
@@ -262,98 +262,110 @@ class HistoryServerArchiveFetcher {
         }
     }
 
-    private void processArchive(String jobID, Path jobArchive) throws 
IOException {
+    List<ArchiveEvent> processArchive(String archiveId, Path archivePath, Path 
refreshDir)
+            throws IOException {
+        return Collections.singletonList(processJobArchive(archiveId, 
archivePath));
+    }
+
+    ArchiveEvent processJobArchive(String jobId, Path jobArchive) throws 
IOException {
         for (ArchivedJson archive : 
FsJsonArchivist.readArchivedJsons(jobArchive)) {
             String path = archive.getPath();
             String json = archive.getJson();
 
             File target;
             if (path.equals(JobsOverviewHeaders.URL)) {
-                target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+                target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
             } else if (path.equals("/joboverview")) { // legacy path
                 LOG.debug("Migrating legacy archive {}", jobArchive);
                 json = convertLegacyJobOverview(json);
-                target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+                target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
             } else {
                 // this implicitly writes into webJobDir
                 target = new File(webDir, path + JSON_FILE_ENDING);
             }
 
-            java.nio.file.Path parent = target.getParentFile().toPath();
+            writeTargetFile(target, json);
+        }
 
-            try {
-                Files.createDirectories(parent);
-            } catch (FileAlreadyExistsException ignored) {
-                // there may be left-over directories from the previous
-                // attempt
-            }
+        return new ArchiveEvent(jobId, ArchiveEventType.CREATED);
+    }
+
+    void writeTargetFile(File target, String json) throws IOException {
+        java.nio.file.Path parent = target.getParentFile().toPath();
+
+        try {
+            Files.createDirectories(parent);
+        } catch (FileAlreadyExistsException ignored) {
+            // there may be left-over directories from the previous attempt
+        }
 
-            java.nio.file.Path targetPath = target.toPath();
+        java.nio.file.Path targetPath = target.toPath();
 
-            // We overwrite existing files since this may be another attempt
-            // at fetching this archive.
-            // Existing files may be incomplete/corrupt.
-            Files.deleteIfExists(targetPath);
+        // We overwrite existing files since this may be another attempt
+        // at fetching this archive.
+        // Existing files may be incomplete/corrupt.
+        Files.deleteIfExists(targetPath);
 
-            Files.createFile(target.toPath());
-            try (FileWriter fw = new FileWriter(target)) {
-                fw.write(json);
-                fw.flush();
-            }
+        Files.createFile(target.toPath());
+        try (FileWriter fw = new FileWriter(target)) {
+            fw.write(json);
+            fw.flush();
         }
     }
 
-    private List<ArchiveEvent> cleanupJobsBeyondSizeLimit(
-            Map<Path, Set<Path>> jobArchivesToRemove) {
-        Map<Path, Set<String>> allJobIdsToRemoveFromOverview = new HashMap<>();
+    List<ArchiveEvent> cleanupArchivesBeyondRetainedLimit(Map<Path, Set<Path>> 
archivesToRemove) {
+        Map<Path, Set<String>> allArchiveIdsToRemove = new HashMap<>();
 
-        for (Map.Entry<Path, Set<Path>> pathSetEntry : 
jobArchivesToRemove.entrySet()) {
-            HashSet<String> jobIdsToRemoveFromOverview = new HashSet<>();
+        for (Map.Entry<Path, Set<Path>> pathSetEntry : 
archivesToRemove.entrySet()) {
+            HashSet<String> archiveIdsToRemove = new HashSet<>();
 
             for (Path archive : pathSetEntry.getValue()) {
-                jobIdsToRemoveFromOverview.add(archive.getName());
+                archiveIdsToRemove.add(archive.getName());
                 try {
-                    archive.getFileSystem().delete(archive, false);
+                    deleteFromRemote(archive);
                 } catch (IOException ioe) {
-                    LOG.warn("Could not delete old archive " + archive, ioe);
+                    LOG.warn("Could not delete old archive {}", archive, ioe);
                 }
             }
-            allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(), 
jobIdsToRemoveFromOverview);
+            allArchiveIdsToRemove.put(pathSetEntry.getKey(), 
archiveIdsToRemove);
         }
 
-        return cleanupExpiredJobs(allJobIdsToRemoveFromOverview);
+        return cleanupExpiredArchives(allArchiveIdsToRemove);
     }
 
-    private List<ArchiveEvent> cleanupExpiredJobs(Map<Path, Set<String>> 
jobsToRemove) {
+    void deleteFromRemote(Path archive) throws IOException {
+        archive.getFileSystem().delete(archive, false);
+    }
 
+    List<ArchiveEvent> cleanupExpiredArchives(Map<Path, Set<String>> 
archivesToRemove) {
         List<ArchiveEvent> deleteLog = new ArrayList<>();
-        LOG.info("Archive directories for jobs {} were deleted.", 
jobsToRemove);
 
-        jobsToRemove.forEach(
-                (refreshDir, archivesToRemove) -> {
-                    
cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archivesToRemove);
+        archivesToRemove.forEach(
+                (refreshDir, archives) -> {
+                    
cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archives);
+                    archives.forEach(
+                            archiveId ->
+                                    
deleteLog.addAll(deleteCachedArchives(archiveId, refreshDir)));
                 });
-        jobsToRemove.values().stream()
-                .flatMap(Set::stream)
-                .forEach(
-                        removedJobID -> {
-                            deleteJobFiles(removedJobID);
-                            deleteLog.add(new ArchiveEvent(removedJobID, 
ArchiveEventType.DELETED));
-                        });
 
         return deleteLog;
     }
 
-    private void deleteJobFiles(String jobID) {
+    List<ArchiveEvent> deleteCachedArchives(String archiveId, Path refreshDir) 
{
+        LOG.info("Archive directories for job {} is deleted", archiveId);
+        return Collections.singletonList(deleteJobFiles(archiveId));
+    }
+
+    ArchiveEvent deleteJobFiles(String jobId) {
         // Make sure we do not include this job in the overview
         try {
-            Files.deleteIfExists(new File(webOverviewDir, jobID + 
JSON_FILE_ENDING).toPath());
+            Files.deleteIfExists(new File(webOverviewDir, jobId + 
JSON_FILE_ENDING).toPath());
         } catch (IOException ioe) {
             LOG.warn("Could not delete file from overview directory.", ioe);
         }
 
         // Clean up job files we may have created
-        File jobDirectory = new File(webJobDir, jobID);
+        File jobDirectory = new File(webJobDir, jobId);
         try {
             FileUtils.deleteDirectory(jobDirectory);
         } catch (IOException ioe) {
@@ -361,13 +373,15 @@ class HistoryServerArchiveFetcher {
         }
 
         try {
-            Files.deleteIfExists(new File(webJobDir, jobID + 
JSON_FILE_ENDING).toPath());
+            Files.deleteIfExists(new File(webJobDir, jobId + 
JSON_FILE_ENDING).toPath());
         } catch (IOException ioe) {
             LOG.warn("Could not delete file from job directory.", ioe);
         }
+
+        return new ArchiveEvent(jobId, ArchiveEventType.DELETED);
     }
 
-    private static String convertLegacyJobOverview(String legacyOverview) 
throws IOException {
+    private String convertLegacyJobOverview(String legacyOverview) throws 
IOException {
         JsonNode root = mapper.readTree(legacyOverview);
         JsonNode finishedJobs = root.get("finished");
         JsonNode job = finishedJobs.get(0);
@@ -436,6 +450,10 @@ class HistoryServerArchiveFetcher {
         return sw.toString();
     }
 
+    void updateOverview() {
+        updateJobOverview();
+    }
+
     /**
      * This method replicates the JSON response that would be given by the 
JobsOverviewHandler when
      * listing both running and finished jobs.
@@ -445,7 +463,7 @@ class HistoryServerArchiveFetcher {
      *
      * <p>For the display in the HistoryServer WebFrontend we have to combine 
these overviews.
      */
-    private static void updateJobOverview(File webOverviewDir, File webDir) {
+    void updateJobOverview() {
         try (JsonGenerator gen =
                 jacksonFactory.createGenerator(
                         HistoryServer.createOrGetFile(webDir, 
JobsOverviewHeaders.URL))) {
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
similarity index 96%
rename from 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
rename to 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
index 2ef991698bf..f2e50dd73c5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.history.retaining;
 import org.apache.flink.core.fs.FileStatus;
 
 /** To define the strategy interface to judge whether the file should be 
retained. */
-public interface JobRetainedStrategy {
+public interface ArchiveRetainedStrategy {
 
     /**
      * Judge whether the file should be retained.
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
similarity index 63%
rename from 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
rename to 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
index acd35c93f79..2a38dfaeff7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
@@ -31,23 +31,42 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
 import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
 import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** The retained strategy. */
-public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+public class CompositeArchiveRetainedStrategy implements 
ArchiveRetainedStrategy {
 
-    public static JobRetainedStrategy createFrom(ReadableConfig config) {
+    public static ArchiveRetainedStrategy 
createForJobFromConfig(ReadableConfig config) {
         int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+        if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
+            throw new IllegalConfigurationException(
+                    "Cannot set %s to 0 or less than -1", 
HISTORY_SERVER_RETAINED_JOBS.key());
+        }
         Optional<Duration> retainedTtlOpt = 
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
-        return new CompositeJobRetainedStrategy(
-                new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
-                new 
TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+        return new CompositeArchiveRetainedStrategy(
+                new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey),
+                new 
TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
     }
 
-    private final List<JobRetainedStrategy> strategies;
+    public static ArchiveRetainedStrategy 
createForApplicationFromConfig(ReadableConfig config) {
+        int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS);
+        if (maxHistorySize == 0 || maxHistorySize < -1) {
+            throw new IllegalConfigurationException(
+                    "Cannot set %s to 0 or less than -1",
+                    HISTORY_SERVER_RETAINED_APPLICATIONS.key());
+        }
+        Optional<Duration> retainedTtlOpt = 
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+        return new CompositeArchiveRetainedStrategy(
+                new QuantityArchiveRetainedStrategy(maxHistorySize),
+                new 
TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
+    }
+
+    private final List<ArchiveRetainedStrategy> strategies;
 
-    CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+    CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) {
         this.strategies =
                 strategies == null || strategies.length == 0
                         ? Collections.emptyList()
@@ -64,11 +83,11 @@ public class CompositeJobRetainedStrategy implements 
JobRetainedStrategy {
 }
 
 /** The time to live based retained strategy. */
-class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy {
 
     @Nullable private final Duration ttlThreshold;
 
-    TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+    TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) {
         if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
             throw new IllegalConfigurationException(
                     "Cannot set %s to 0 or less than 0 milliseconds",
@@ -86,16 +105,13 @@ class TimeToLiveJobRetainedStrategy implements 
JobRetainedStrategy {
     }
 }
 
-/** The job quantity based retained strategy. */
-class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+/** The quantity based retained strategy. */
+class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy {
 
     private final int quantityThreshold;
 
-    QuantityJobRetainedStrategy(int quantityThreshold) {
-        if (quantityThreshold == 0 || quantityThreshold < -1) {
-            throw new IllegalConfigurationException(
-                    "Cannot set %s to 0 or less than -1", 
HISTORY_SERVER_RETAINED_JOBS.key());
-        }
+    QuantityArchiveRetainedStrategy(int quantityThreshold) {
+        checkArgument(quantityThreshold == -1 || quantityThreshold > 0);
         this.quantityThreshold = quantityThreshold;
     }
 
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 537d58fae8e..c67a3147628 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,19 +18,32 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.history.ArchivePathUtils;
 import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import 
org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import 
org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +64,8 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
@@ -58,11 +73,14 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -86,13 +104,14 @@ class HistoryServerTest {
     private MiniClusterWithClientResource cluster;
     private File jmDirectory;
     private File hsDirectory;
+    private Configuration clusterConfig;
 
     @BeforeEach
     void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws 
Exception {
         this.jmDirectory = jmDirectory;
         this.hsDirectory = hsDirectory;
 
-        Configuration clusterConfig = new Configuration();
+        clusterConfig = new Configuration();
         clusterConfig.set(JobManagerOptions.ARCHIVE_DIR, 
jmDirectory.toURI().toString());
 
         cluster =
@@ -130,6 +149,9 @@ class HistoryServerTest {
                                     == 
HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
                                 numExpectedArchivedJobs.countDown();
                             }
+                        },
+                        (event) -> {
+                            throw new RuntimeException("Should not call");
                         });
 
         try {
@@ -164,10 +186,10 @@ class HistoryServerTest {
         final int numArchivesToRemoveUponHsStart =
                 numArchivesBeforeHsStarted - numArchivesToKeepInHistory;
         final long oneMinuteSinceEpoch = 1000L * 60L;
-        List<String> expectedJobIdsToKeep = new LinkedList<>();
+        List<JobID> expectedJobIdsToKeep = new LinkedList<>();
 
         for (int j = 0; j < numArchivesBeforeHsStarted; j++) {
-            String jobId =
+            JobID jobId =
                     createLegacyArchive(
                             jmDirectory.toPath(), j * oneMinuteSinceEpoch, 
versionLessThan14);
             if (j >= numArchivesToRemoveUponHsStart) {
@@ -205,6 +227,9 @@ class HistoryServerTest {
                                     numArchivesDeletedTotal.countDown();
                                     break;
                             }
+                        },
+                        (event) -> {
+                            throw new RuntimeException("Should not call");
                         });
 
         try {
@@ -232,10 +257,9 @@ class HistoryServerTest {
         }
     }
 
-    private Set<String> getIdsFromJobOverview(String baseUrl) throws Exception 
{
+    private Set<JobID> getIdsFromJobOverview(String baseUrl) throws Exception {
         return getJobsOverview(baseUrl).getJobs().stream()
                 .map(JobDetails::getJobId)
-                .map(JobID::toString)
                 .collect(Collectors.toSet());
     }
 
@@ -288,15 +312,26 @@ class HistoryServerTest {
         new File(hsDirectory.toURI() + 
"/overviews/dirtyEmptySubFile.json").createNewFile();
         new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir();
         new File(hsDirectory.toURI() + 
"/jobs/dirtyEmptySubFile.json").createNewFile();
+        new File(hsDirectory.toURI() + 
"/application-overviews/dirtyEmptySubDir").mkdir();
+        new File(hsDirectory.toURI() + 
"/application-overviews/dirtyEmptySubFile.json")
+                .createNewFile();
+        new File(hsDirectory.toURI() + 
"/applications/dirtyEmptySubDir").mkdir();
+        new File(hsDirectory.toURI() + 
"/applications/dirtyEmptySubFile.json").createNewFile();
         hs = new HistoryServer(historyServerConfig);
         assertInitializedHistoryServerWebDir(hs.getWebDir());
     }
 
     private void assertInitializedHistoryServerWebDir(File historyWebDir) {
-
-        
assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs");
+        assertThat(historyWebDir.list())
+                .containsExactlyInAnyOrder(
+                        "overviews", "jobs", "application-overviews", 
"applications");
         assertThat(new File(historyWebDir, 
"overviews")).exists().isDirectory().isEmptyDirectory();
         assertThat(new File(historyWebDir, 
"jobs").list()).containsExactly("overview.json");
+        assertThat(new File(historyWebDir, "application-overviews"))
+                .exists()
+                .isDirectory()
+                .isEmptyDirectory();
+        assertThat(new File(historyWebDir, 
"applications").list()).containsExactly("overview.json");
     }
 
     private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws 
Exception {
@@ -327,6 +362,9 @@ class HistoryServerTest {
                                     allArchivesExpiredLatch.countDown();
                                     break;
                             }
+                        },
+                        (event) -> {
+                            throw new RuntimeException("Should not call");
                         });
 
         try {
@@ -378,27 +416,33 @@ class HistoryServerTest {
 
             assertThat(allArchivesExpiredLatch.await(10L, 
TimeUnit.SECONDS)).isTrue();
 
-            assertJobFilesCleanedUp(cleanupExpiredJobs);
+            assertFilesCleanedUp(cleanupExpiredJobs);
         } finally {
             hs.stop();
         }
     }
 
-    private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) 
throws IOException {
+    private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws 
IOException {
         try (Stream<Path> paths = Files.walk(hsDirectory.toPath())) {
-            final List<Path> jobFiles =
+            final List<Path> applicationOrJobFiles =
                     paths.filter(path -> !path.equals(hsDirectory.toPath()))
                             .map(path -> hsDirectory.toPath().relativize(path))
                             .filter(path -> 
!path.equals(Paths.get("config.json")))
                             .filter(path -> !path.equals(Paths.get("jobs")))
                             .filter(path -> !path.equals(Paths.get("jobs", 
"overview.json")))
                             .filter(path -> 
!path.equals(Paths.get("overviews")))
+                            .filter(path -> 
!path.equals(Paths.get("applications")))
+                            .filter(
+                                    path ->
+                                            !path.equals(
+                                                    Paths.get("applications", 
"overview.json")))
+                            .filter(path -> 
!path.equals(Paths.get("application-overviews")))
                             .collect(Collectors.toList());
 
-            if (jobFilesShouldBeDeleted) {
-                assertThat(jobFiles).isEmpty();
+            if (filesShouldBeDeleted) {
+                assertThat(applicationOrJobFiles).isEmpty();
             } else {
-                assertThat(jobFiles).isNotEmpty();
+                assertThat(applicationOrJobFiles).isNotEmpty();
             }
         }
     }
@@ -413,6 +457,13 @@ class HistoryServerTest {
     }
 
     private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
+        return createTestConfiguration(
+                cleanupExpiredJobs,
+                
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue());
+    }
+
+    private Configuration createTestConfiguration(
+            boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) {
         Configuration historyServerConfig = new Configuration();
         historyServerConfig.set(
                 HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, 
jmDirectory.toURI().toString());
@@ -424,6 +475,9 @@ class HistoryServerTest {
 
         historyServerConfig.set(
                 HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, 
cleanupExpiredJobs);
+        historyServerConfig.set(
+                
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS,
+                cleanupExpiredApplications);
 
         historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 
0);
         return historyServerConfig;
@@ -451,15 +505,15 @@ class HistoryServerTest {
         env.execute();
     }
 
-    private static String createLegacyArchive(
+    private static JobID createLegacyArchive(
             Path directory, long fileModifiedDate, boolean versionLessThan14) 
throws IOException {
-        String jobId = createLegacyArchive(directory, versionLessThan14);
-        File jobArchive = directory.resolve(jobId).toFile();
+        JobID jobId = createLegacyArchive(directory, versionLessThan14);
+        File jobArchive = directory.resolve(jobId.toString()).toFile();
         jobArchive.setLastModified(fileModifiedDate);
         return jobId;
     }
 
-    private static String createLegacyArchive(Path directory, boolean 
versionLessThan14)
+    private static JobID createLegacyArchive(Path directory, boolean 
versionLessThan14)
             throws IOException {
         JobID jobId = JobID.generate();
 
@@ -504,7 +558,405 @@ class HistoryServerTest {
                         directory.toAbsolutePath().toString(), 
jobId.toString()),
                 Collections.singleton(archivedJson));
 
-        return jobId.toString();
+        return jobId;
+    }
+
+    @Test
+    void testApplicationAndJobArchives() throws Exception {
+        int numApplications = 2;
+        int numJobsPerApplication = 2;
+        // jobs that are not part of an application
+        int numJobsOutsideApplication = 1;
+
+        Map<ApplicationID, Set<JobID>> expectedApplicationAndJobIds =
+                new HashMap<>(numApplications);
+        for (int i = 0; i < numApplications; i++) {
+            ArchivedApplication archivedApplication = 
mockApplicationArchive(numJobsPerApplication);
+            ApplicationID applicationId = 
archivedApplication.getApplicationId();
+            List<JobID> jobIds =
+                    archivedApplication.getJobs().values().stream()
+                            .map(ExecutionGraphInfo::getJobId)
+                            .collect(Collectors.toList());
+            expectedApplicationAndJobIds.put(applicationId, new 
HashSet<>(jobIds));
+        }
+        Set<JobID> expectedJobIdsOutsideApplication = new 
HashSet<>(numJobsOutsideApplication);
+        for (int i = 0; i < numJobsOutsideApplication; i++) {
+            ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+            mockJobArchive(executionGraphInfo, null);
+            
expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId());
+        }
+
+        int numTotalJobs = numApplications * numJobsPerApplication + 
numJobsOutsideApplication;
+        int numTotal = numApplications + numTotalJobs;
+        CountDownLatch numExpectedArchives = new CountDownLatch(numTotal);
+        Configuration historyServerConfig = createTestConfiguration(false);
+        HistoryServer hs =
+                new HistoryServer(
+                        historyServerConfig,
+                        (event) -> {
+                            if (event.getType()
+                                    == 
HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
+                                numExpectedArchives.countDown();
+                            }
+                        },
+                        (event) -> {
+                            if (event.getType()
+                                    == 
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+                                            .CREATED) {
+                                numExpectedArchives.countDown();
+                            }
+                        });
+        try {
+            hs.start();
+            String baseUrl = "http://localhost:"; + hs.getWebPort();
+            assertThat(numExpectedArchives.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+                    .isEqualTo(expectedApplicationAndJobIds);
+            Set<JobID> expectedJobIds =
+                    Stream.concat(
+                                    
expectedApplicationAndJobIds.values().stream()
+                                            .flatMap(Set::stream),
+                                    expectedJobIdsOutsideApplication.stream())
+                            .collect(Collectors.toSet());
+            
assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds);
+            // checks whether the dashboard configuration contains all 
expected fields
+            getDashboardConfiguration(baseUrl);
+        } finally {
+            hs.stop();
+        }
+    }
+
+    @Test
+    void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws 
Exception {
+        int numJobsPerApplication = 1;
+        int numApplicationsToKeepInHistory = 2;
+        int numApplicationsBeforeHsStarted = 4;
+        int numApplicationsAfterHsStarted = 2;
+        int numApplicationsToRemoveUponHsStart =
+                numApplicationsBeforeHsStarted - 
numApplicationsToKeepInHistory;
+        List<Tuple2<ApplicationID, Set<JobID>>> 
expectedApplicationAndJobIdsToKeep =
+                new LinkedList<>();
+        for (int i = 0; i < numApplicationsBeforeHsStarted; i++) {
+            ArchivedApplication archivedApplication = 
mockApplicationArchive(numJobsPerApplication);
+            ApplicationID applicationId = 
archivedApplication.getApplicationId();
+            List<JobID> jobIds =
+                    archivedApplication.getJobs().values().stream()
+                            .map(ExecutionGraphInfo::getJobId)
+                            .collect(Collectors.toList());
+            if (i >= numApplicationsToRemoveUponHsStart) {
+                expectedApplicationAndJobIdsToKeep.add(
+                        new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+            }
+        }
+
+        // one for application itself, numJobsPerApplication for jobs
+        int numArchivesRatio = 1 + numJobsPerApplication;
+        CountDownLatch numArchivesCreatedInitially =
+                new CountDownLatch(numApplicationsToKeepInHistory * 
numArchivesRatio);
+        // jobs in applications that exceed the size limit are not read by the 
fetcher at all,
+        // so there is no need to delete these jobs.
+        CountDownLatch numArchivesDeletedInitially =
+                new CountDownLatch(numApplicationsToRemoveUponHsStart);
+        CountDownLatch numArchivesCreatedTotal =
+                new CountDownLatch(
+                        (numApplicationsBeforeHsStarted
+                                        - numApplicationsToRemoveUponHsStart
+                                        + numApplicationsAfterHsStarted)
+                                * numArchivesRatio);
+        CountDownLatch numArchivesDeletedTotal =
+                new CountDownLatch(
+                        numApplicationsToRemoveUponHsStart
+                                + numApplicationsAfterHsStarted * 
numArchivesRatio);
+        Configuration historyServerConfig =
+                createTestConfiguration(
+                        
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
+        historyServerConfig.set(
+                HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS,
+                numApplicationsToKeepInHistory);
+        HistoryServer hs =
+                new HistoryServer(
+                        historyServerConfig,
+                        (event) -> {
+                            throw new RuntimeException("Should not call");
+                        },
+                        (event) -> {
+                            if (event.getType()
+                                    == 
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+                                            .CREATED) {
+                                numArchivesCreatedInitially.countDown();
+                                numArchivesCreatedTotal.countDown();
+                            } else if (event.getType()
+                                    == 
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+                                            .DELETED) {
+                                numArchivesDeletedInitially.countDown();
+                                numArchivesDeletedTotal.countDown();
+                            }
+                        });
+        try {
+            hs.start();
+            String baseUrl = "http://localhost:"; + hs.getWebPort();
+            assertThat(numArchivesCreatedInitially.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(numArchivesDeletedInitially.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+                    .isEqualTo(
+                            expectedApplicationAndJobIdsToKeep.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    tuple -> tuple.f0, tuple 
-> tuple.f1)));
+            for (int i = numApplicationsBeforeHsStarted;
+                    i < numApplicationsBeforeHsStarted + 
numApplicationsAfterHsStarted;
+                    i++) {
+                expectedApplicationAndJobIdsToKeep.remove(0);
+                ArchivedApplication archivedApplication =
+                        mockApplicationArchive(numJobsPerApplication);
+                ApplicationID applicationId = 
archivedApplication.getApplicationId();
+                List<JobID> jobIds =
+                        archivedApplication.getJobs().values().stream()
+                                .map(ExecutionGraphInfo::getJobId)
+                                .collect(Collectors.toList());
+                expectedApplicationAndJobIdsToKeep.add(
+                        new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+                // avoid executing too fast, resulting in the same creation 
time of archive files
+                Thread.sleep(50);
+            }
+
+            assertThat(numArchivesCreatedTotal.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(numArchivesDeletedTotal.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+                    .isEqualTo(
+                            expectedApplicationAndJobIdsToKeep.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    tuple -> tuple.f0, tuple 
-> tuple.f1)));
+        } finally {
+            hs.stop();
+        }
+    }
+
+    @Test
+    void testFailIfApplicationHistorySizeLimitIsZero() {
+        assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0))
+                .isInstanceOf(IllegalConfigurationException.class);
+    }
+
+    @Test
+    void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() {
+        assertThatThrownBy(() -> 
startHistoryServerWithApplicationSizeLimit(-2))
+                .isInstanceOf(IllegalConfigurationException.class);
+    }
+
+    private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize)
+            throws IOException, FlinkException, InterruptedException {
+        Configuration historyServerConfig =
+                createTestConfiguration(
+                        
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS
+                                .defaultValue());
+        historyServerConfig.set(
+                HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, 
maxHistorySize);
+        new HistoryServer(historyServerConfig).start();
+    }
+
+    @Test
+    void testCleanExpiredApplication() throws Exception {
+        runApplicationArchiveExpirationTest(true);
+    }
+
+    @Test
+    void testRemainExpiredApplication() throws Exception {
+        runApplicationArchiveExpirationTest(false);
+    }
+
+    private void runApplicationArchiveExpirationTest(boolean 
cleanupExpiredApplications)
+            throws Exception {
+        int numExpiredApplications = cleanupExpiredApplications ? 1 : 0;
+        int numApplications = 3;
+        int numJobsPerApplication = 1;
+
+        Map<ApplicationID, Set<JobID>> expectedApplicationAndJobIds =
+                new HashMap<>(numApplications);
+        for (int i = 0; i < numApplications; i++) {
+            ArchivedApplication archivedApplication = 
mockApplicationArchive(numJobsPerApplication);
+            ApplicationID applicationId = 
archivedApplication.getApplicationId();
+            List<JobID> jobIds =
+                    archivedApplication.getJobs().values().stream()
+                            .map(ExecutionGraphInfo::getJobId)
+                            .collect(Collectors.toList());
+            expectedApplicationAndJobIds.put(applicationId, new 
HashSet<>(jobIds));
+        }
+
+        // one for application itself, numJobsPerApplication for jobs
+        int numArchivesRatio = 1 + numJobsPerApplication;
+        CountDownLatch numExpectedArchives = new 
CountDownLatch(numApplications * numArchivesRatio);
+        CountDownLatch firstArchiveExpiredLatch =
+                new CountDownLatch(numExpiredApplications * numArchivesRatio);
+        CountDownLatch allArchivesExpiredLatch =
+                new CountDownLatch(
+                        cleanupExpiredApplications ? numApplications * 
numArchivesRatio : 0);
+
+        Configuration historyServerConfig =
+                createTestConfiguration(
+                        
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(),
+                        cleanupExpiredApplications);
+        HistoryServer hs =
+                new HistoryServer(
+                        historyServerConfig,
+                        (event) -> {
+                            throw new RuntimeException("Should not call");
+                        },
+                        (event) -> {
+                            if (event.getType()
+                                    == 
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+                                            .CREATED) {
+                                numExpectedArchives.countDown();
+                            } else if (event.getType()
+                                    == 
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+                                            .DELETED) {
+                                firstArchiveExpiredLatch.countDown();
+                                allArchivesExpiredLatch.countDown();
+                            }
+                        });
+        try {
+            hs.start();
+            String baseUrl = "http://localhost:"; + hs.getWebPort();
+            assertThat(numExpectedArchives.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+                    .isEqualTo(expectedApplicationAndJobIds);
+            ApplicationID applicationIdToDelete =
+                    expectedApplicationAndJobIds.keySet().stream()
+                            .findFirst()
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalStateException(
+                                                    "Expected at least one 
application"));
+            if (cleanupExpiredApplications) {
+                expectedApplicationAndJobIds.remove(applicationIdToDelete);
+            }
+            // trigger another fetch and delete one archive from jm
+            // we fetch again to probabilistically cause a concurrent deletion
+            hs.fetchArchives();
+            deleteApplicationArchiveDir(applicationIdToDelete);
+
+            assertThat(firstArchiveExpiredLatch.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            // check that archive is still/no longer present in hs
+            assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+                    .isEqualTo(expectedApplicationAndJobIds);
+            for (ApplicationID remainingApplicationId : 
expectedApplicationAndJobIds.keySet()) {
+                deleteApplicationArchiveDir(remainingApplicationId);
+            }
+            assertThat(allArchivesExpiredLatch.await(10L, 
TimeUnit.SECONDS)).isTrue();
+            assertFilesCleanedUp(cleanupExpiredApplications);
+        } finally {
+            hs.stop();
+        }
+    }
+
+    private Map<ApplicationID, Set<JobID>> 
getApplicationAndJobIdsFromApplicationOverview(
+            String baseUrl) throws Exception {
+        Set<ApplicationID> applicationIds =
+                getApplicationsOverview(baseUrl).getApplications().stream()
+                        .map(ApplicationDetails::getApplicationId)
+                        .collect(Collectors.toSet());
+        Map<ApplicationID, Set<JobID>> applicationAndJobIds = new 
HashMap<>(applicationIds.size());
+        for (ApplicationID applicationId : applicationIds) {
+            Set<JobID> jobIds =
+                    getApplicationDetails(baseUrl, 
applicationId).getJobs().stream()
+                            .map(JobDetails::getJobId)
+                            .collect(Collectors.toSet());
+            applicationAndJobIds.put(applicationId, jobIds);
+        }
+        return applicationAndJobIds;
+    }
+
+    private static MultipleApplicationsDetails getApplicationsOverview(String 
baseUrl)
+            throws Exception {
+        Tuple2<Integer, String> response =
+                HttpUtils.getFromHTTP(baseUrl + 
ApplicationsOverviewHeaders.URL);
+        return OBJECT_MAPPER.readValue(response.f1, 
MultipleApplicationsDetails.class);
+    }
+
+    private static ApplicationDetailsInfo getApplicationDetails(
+            String baseUrl, ApplicationID applicationId) throws Exception {
+        Tuple2<Integer, String> response =
+                HttpUtils.getFromHTTP(
+                        baseUrl
+                                + ApplicationDetailsHeaders.URL.replace(
+                                        ':' + ApplicationIDPathParameter.KEY,
+                                        applicationId.toString()));
+        return OBJECT_MAPPER.readValue(response.f1, 
ApplicationDetailsInfo.class);
+    }
+
+    private ArchivedApplication mockApplicationArchive(int numJobs) throws 
IOException {
+        ArchivedApplication archivedApplication = 
createArchivedApplication(numJobs);
+        ApplicationID applicationId = archivedApplication.getApplicationId();
+        ArchivedJson archivedApplicationsOverview =
+                new ArchivedJson(
+                        ApplicationsOverviewHeaders.URL,
+                        new MultipleApplicationsDetails(
+                                Collections.singleton(
+                                        
ApplicationDetails.fromArchivedApplication(
+                                                archivedApplication))));
+        ArchivedJson archivedApplicationDetails =
+                new ArchivedJson(
+                        ApplicationDetailsHeaders.URL.replace(
+                                ':' + ApplicationIDPathParameter.KEY, 
applicationId.toString()),
+                        
ApplicationDetailsInfo.fromArchivedApplication(archivedApplication));
+        // set cluster id to application id to simplify the test
+        clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+        FsJsonArchivist.writeArchivedJsons(
+                ArchivePathUtils.getApplicationArchivePath(clusterConfig, 
applicationId),
+                Arrays.asList(archivedApplicationsOverview, 
archivedApplicationDetails));
+
+        Map<JobID, ExecutionGraphInfo> jobs = archivedApplication.getJobs();
+        for (Map.Entry<JobID, ExecutionGraphInfo> jobEntry : jobs.entrySet()) {
+            mockJobArchive(jobEntry.getValue(), applicationId);
+        }
+        return archivedApplication;
+    }
+
+    private void mockJobArchive(
+            ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID 
applicationId)
+            throws IOException {
+        JobID jobId = executionGraphInfo.getJobId();
+        ArchivedJson archivedJobsOverview =
+                new ArchivedJson(
+                        JobsOverviewHeaders.URL,
+                        new MultipleJobsDetails(
+                                Collections.singleton(
+                                        JobDetails.createDetailsForJob(
+                                                
executionGraphInfo.getArchivedExecutionGraph()))));
+        FsJsonArchivist.writeArchivedJsons(
+                ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, 
applicationId),
+                Collections.singletonList(archivedJobsOverview));
+    }
+
+    private ArchivedApplication createArchivedApplication(int numJobs) {
+        ApplicationID applicationId = ApplicationID.generate();
+        Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>(numJobs);
+        for (int i = 0; i < numJobs; i++) {
+            ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+            jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
+        }
+        return new ArchivedApplication(
+                applicationId,
+                "test-application",
+                ApplicationState.FINISHED,
+                new long[ApplicationState.values().length],
+                jobs);
+    }
+
+    private ExecutionGraphInfo createExecutionGraphInfo() {
+        return new ExecutionGraphInfo(
+                ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+                        JobID.generate(), "test-job", JobStatus.FINISHED, 
null, null, null, 0));
+    }
+
+    private void deleteApplicationArchiveDir(ApplicationID applicationId) 
throws IOException {
+        // set cluster id to application id to simplify the test
+        clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+        org.apache.flink.core.fs.Path applicationArchiveDir =
+                ArchivePathUtils.getApplicationArchivePath(clusterConfig, 
applicationId)
+                        .getParent();
+        applicationArchiveDir.getFileSystem().delete(applicationArchiveDir, 
true);
     }
 
     private static final class JsonObject implements AutoCloseable {
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
similarity index 64%
rename from 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
rename to 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
index e8983967df5..d1d6124b357 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
@@ -18,31 +18,49 @@
 
 package org.apache.flink.runtime.webmonitor.history.retaining;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.function.Function;
+import java.util.stream.Stream;
 
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
 import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
 import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Testing for {@link CompositeJobRetainedStrategy}. */
-class CompositeJobRetainedStrategyTest {
+/** Testing for {@link CompositeArchiveRetainedStrategy}. */
+class CompositeArchiveRetainedStrategyTest {
+
+    private static Stream<TestCase> getTestCases() {
+        return Stream.of(
+                new TestCase(
+                        "Legacy Jobs",
+                        HISTORY_SERVER_RETAINED_JOBS,
+                        
CompositeArchiveRetainedStrategy::createForJobFromConfig),
+                new TestCase(
+                        "Applications",
+                        HISTORY_SERVER_RETAINED_APPLICATIONS,
+                        
CompositeArchiveRetainedStrategy::createForApplicationFromConfig));
+    }
 
-    @Test
-    void testTimeToLiveBasedJobRetainedStrategy() {
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("getTestCases")
+    void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) {
         final Configuration conf = new Configuration();
 
         // Test for invalid option value.
         conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
-        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+        assertThatThrownBy(() -> testCase.createStrategy(conf))
                 .isInstanceOf(IllegalConfigurationException.class);
         // Skipped for option value that is less than 0 milliseconds, which 
will throw a
         // java.lang.NumberFormatException caused by TimeUtils.
@@ -51,7 +69,7 @@ class CompositeJobRetainedStrategyTest {
 
         // Test the case where no specific retention policy is configured, 
i.e., all archived files
         // are retained.
-        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
         assertThat(
                         strategy.shouldRetain(
@@ -63,7 +81,7 @@ class CompositeJobRetainedStrategyTest {
 
         // Test the case where TTL-based retention policies is specified only.
         conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
-        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
         assertThat(
                         strategy.shouldRetain(
@@ -74,35 +92,37 @@ class CompositeJobRetainedStrategyTest {
                 .isFalse();
     }
 
-    @Test
-    void testQuantityBasedJobRetainedStrategy() {
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("getTestCases")
+    void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) {
         final Configuration conf = new Configuration();
 
         // Test for invalid option value.
-        conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
-        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+        conf.set(testCase.getQuantityConfigOption(), 0);
+        assertThatThrownBy(() -> testCase.createStrategy(conf))
                 .isInstanceOf(IllegalConfigurationException.class);
-        conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
-        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+        conf.set(testCase.getQuantityConfigOption(), -2);
+        assertThatThrownBy(() -> testCase.createStrategy(conf))
                 .isInstanceOf(IllegalConfigurationException.class);
 
-        conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+        conf.removeConfig(testCase.getQuantityConfigOption());
 
         // Test the case where no specific retention policy is configured, 
i.e., all archived files
         // are retained.
-        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
 
         // Test the case where QUANTITY-based retention policies is specified 
only.
-        conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
-        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        conf.set(testCase.getQuantityConfigOption(), 2);
+        strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 
3)).isFalse();
     }
 
-    @Test
-    void testCompositeBasedJobRetainedStrategy() {
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("getTestCases")
+    void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) {
 
         final long outOfTtlMillis =
                 Instant.now().toEpochMilli() - 
Duration.ofMinutes(2L).toMillis();
@@ -110,7 +130,7 @@ class CompositeJobRetainedStrategyTest {
         // Test the case where no specific retention policy is configured, 
i.e., all archived files
         // are retained.
         final Configuration conf = new Configuration();
-        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 1)).isTrue();
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 
10)).isTrue();
         assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 3)).isTrue();
@@ -118,8 +138,8 @@ class CompositeJobRetainedStrategyTest {
 
         // Test the case where both retention policies are specified.
         conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
-        conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
-        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        conf.set(testCase.getQuantityConfigOption(), 2);
+        strategy = testCase.createStrategy(conf);
         assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 1)).isFalse();
         assertThat(strategy.shouldRetain(new TestingFileStatus(), 
10)).isFalse();
         assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 3)).isFalse();
@@ -173,4 +193,32 @@ class CompositeJobRetainedStrategyTest {
             return null;
         }
     }
+
+    private static final class TestCase {
+        private final String testName;
+        private final ConfigOption<Integer> quantityConfigOption;
+        private final Function<Configuration, ArchiveRetainedStrategy> 
strategyFunction;
+
+        TestCase(
+                String testName,
+                ConfigOption<Integer> quantityConfigOption,
+                Function<Configuration, ArchiveRetainedStrategy> 
strategyFunction) {
+            this.testName = testName;
+            this.quantityConfigOption = quantityConfigOption;
+            this.strategyFunction = strategyFunction;
+        }
+
+        ArchiveRetainedStrategy createStrategy(Configuration conf) {
+            return strategyFunction.apply(conf);
+        }
+
+        ConfigOption<Integer> getQuantityConfigOption() {
+            return quantityConfigOption;
+        }
+
+        @Override
+        public String toString() {
+            return testName;
+        }
+    }
 }

Reply via email to