Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r139966738
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job
ref-counting and a staged cleanup.
+ * <p>
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if
available) or the BLOB server.
+ * <p>
+ * If files for a job are not needed any more, they will enter a staged,
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements
PermanentBlobService {
--- End diff --
The latter is not possible since each class has its uniquely offered
methods, e.g. `TransientBlobCache` allows deleting BLOBs, while
`PermanentBlobCache` does not. I can come up with a common base class though.
---