[
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411448#comment-16411448
]
ASF GitHub Bot commented on FLINK-8620:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r176747071
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final
FileCache cache, final Logger log
//
------------------------------------------------------------------------
/**
- * Asynchronous file copy process.
- */
- private static class CopyProcess implements Callable<Path> {
-
- private final Path filePath;
- private final Path cachedPath;
- private boolean executable;
-
- public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
- this.filePath = new Path(e.filePath);
- this.executable = e.isExecutable;
- this.cachedPath = cachedPath;
- }
-
- @Override
- public Path call() throws IOException {
- // let exceptions propagate. we can retrieve them later
from
- // the future and report them upon access to the result
- copy(filePath, cachedPath, this.executable);
- return cachedPath;
- }
- }
-
- /**
- * If no task is using this file after 5 seconds, clear it.
+ * Asynchronous file copy process from blob server.
*/
- private static class DeleteProcess implements Runnable {
+ private static class CopyFromBlobProcess implements Callable<Path> {
- private final Object lock;
- private final Map<JobID, Map<String, Tuple4<Integer, File,
Path, Future<Path>>>> entries;
-
- private final String name;
+ private final PermanentBlobKey blobKey;
+ private final Path target;
+ private final boolean directory;
+ private final boolean executable;
private final JobID jobID;
+ private final PermanentBlobService blobService;
- public DeleteProcess(Object lock, Map<JobID, Map<String,
Tuple4<Integer, File, Path, Future<Path>>>> entries,
- String name,
JobID jobID) {
- this.lock = lock;
- this.entries = entries;
- this.name = name;
- this.jobID = jobID;
+ CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID,
PermanentBlobService blobService, Path target) {
+ try {
+ this.executable = e.isExecutable;
+ this.directory = e.isZipped;
+ this.jobID = jobID;
+ this.blobService = blobService;
+ this.blobKey =
InstantiationUtil.deserializeObject(e.blobKey,
Thread.currentThread().getContextClassLoader());
+ this.target = target;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
- public void run() {
- try {
- synchronized (lock) {
- Map<String, Tuple4<Integer, File, Path,
Future<Path>>> jobEntries = entries.get(jobID);
-
- if (jobEntries != null) {
- Tuple4<Integer, File, Path,
Future<Path>> entry = jobEntries.get(name);
-
- if (entry != null) {
- int count = entry.f0;
- if (count > 1) {
- // multiple
references still
- entry.f0 =
count - 1;
- }
- else {
- // we remove
the last reference
-
jobEntries.remove(name);
- if
(jobEntries.isEmpty()) {
-
entries.remove(jobID);
- }
-
- // abort the
copy
-
entry.f3.cancel(true);
-
- // remove the
file
- File file = new
File(entry.f2.toString());
- if
(file.exists()) {
- if
(file.isDirectory()) {
-
FileUtils.deleteDirectory(file);
- }
- else if
(!file.delete()) {
-
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
- }
- }
-
- // remove the
job wide temp directory, if it is now empty
- File parent =
entry.f1;
- if
(parent.isDirectory()) {
-
String[] children = parent.list();
- if
(children == null || children.length == 0) {
-
//noinspection ResultOfMethodCallIgnored
-
parent.delete();
- }
- }
+ public Path call() throws IOException {
+ final File file = blobService.getFile(jobID, blobKey);
+
+ if (directory) {
+ try (ZipInputStream zis = new
ZipInputStream(new FileInputStream(file))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) !=
null) {
--- End diff --
It does not need to. getNextEntry does it, at the very beginning.
> Enable shipping custom artifacts to BlobStore and accessing them through
> DistributedCache
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
> Issue Type: New Feature
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we
> can store those files in BlobStore and later on access them in TaskManagers
> through DistributedCache.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)