[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188023#comment-16188023
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r142128462
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -18,89 +18,21 @@
package org.apache.flink.runtime.blob;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The BLOB cache implements a local cache for content-addressable BLOBs.
- *
- * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods,
the
- * BLOB cache will first attempt to serve the file from its local cache.
Only if
- * the local cache does not contain the desired BLOB, the BLOB cache will
try to
- * download it from a distributed file system (if available) or the BLOB
- * server.</p>
+ * The BLOB cache provides access to BLOB services for permanent and
transient BLOBs.
*/
-public class BlobCache extends TimerTask implements BlobService {
-
- /** The log object used for debugging. */
- private static final Logger LOG =
LoggerFactory.getLogger(BlobCache.class);
-
- private final InetSocketAddress serverAddress;
-
- /** Root directory for local file storage */
- private final File storageDir;
-
- /** Blob store for distributed file storage, e.g. in HA */
- private final BlobView blobView;
-
- private final AtomicBoolean shutdownRequested = new AtomicBoolean();
-
- /** Shutdown hook thread to ensure deletion of the storage directory. */
- private final Thread shutdownHook;
-
- /** The number of retries when the transfer fails */
- private final int numFetchRetries;
-
- /** Configuration for the blob client like ssl parameters required to
connect to the blob server */
- private final Configuration blobClientConfig;
-
- //
--------------------------------------------------------------------------------------------
-
- /**
- * Job reference counters with a time-to-live (TTL).
- */
- private static class RefCount {
- /**
- * Number of references to a job.
- */
- public int references = 0;
-
- /**
- * Timestamp in milliseconds when any job data should be
cleaned up (no cleanup for
- * non-positive values).
- */
- public long keepUntil = -1;
- }
-
- /** Map to store the number of references to a specific job */
- private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
+public class BlobCache implements BlobService {
- /** Time interval (ms) to run the cleanup task; also used as the
default TTL. */
- private final long cleanupInterval;
+ /** Caching store for permanent BLOBs. */
+ private final PermanentBlobCache permanentBlobStore;
- private final Timer cleanupTimer;
+ /** Store for transient BLOB files. */
+ private final TransientBlobCache transientBlobStore;
--- End diff --
Same here.
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)