[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173179#comment-16173179
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r139969076
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+ /** The log object used for debugging. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TransientBlobCache.class);
+
+ /** Counter to generate unique names for temporary files. */
+ private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+ private final InetSocketAddress serverAddress;
+
+ /**
+ * Root directory for local file storage
+ */
+ private final File storageDir;
+
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /** Shutdown hook thread to ensure deletion of the local storage
directory. */
+ private final Thread shutdownHook;
+
+ /** The number of retries when the transfer fails */
+ private final int numFetchRetries;
+
+ /** Configuration for the blob client like ssl parameters required to
connect to the blob server */
+ private final Configuration blobClientConfig;
+
+ /** Lock guarding concurrent file accesses */
+ private final ReadWriteLock readWriteLock;
+
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param serverAddress
+ * address of the {@link BlobServer} to use for fetching
files from
+ * @param blobClientConfig
+ * global configuration
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage
cannot be created or is not usable
+ */
+ public TransientBlobCache(
+ final InetSocketAddress serverAddress,
+ final Configuration blobClientConfig) throws
IOException {
--- End diff --
Yes, that's mostly why. And passing all parameters that the `BlobClient`
requires is also not too nice, cumbersome and error-prone.
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)