[
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425177#comment-16425177
]
ASF GitHub Bot commented on FLINK-8620:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r179057166
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final
FileCache cache, final Logger log
);
}
+ public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+ checkNotNull(jobId);
+
+ synchronized (lock) {
+ Set<ExecutionAttemptID> jobRefCounter =
jobRefHolders.get(jobId);
+
+ if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+ LOG.warn("improper use of releaseJob() without
a matching number of createTmpFiles() calls for jobId " + jobId);
+ return;
+ }
+
+ jobRefCounter.remove(executionId);
+ if (jobRefCounter.isEmpty()) {
+ executorService.schedule(new
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
+ }
+ }
+ }
+
//
------------------------------------------------------------------------
// background processes
//
------------------------------------------------------------------------
/**
- * Asynchronous file copy process.
+ * Asynchronous file copy process from blob server.
*/
- private static class CopyProcess implements Callable<Path> {
+ private static class CopyFromBlobProcess implements Callable<Path> {
- private final Path filePath;
- private final Path cachedPath;
- private boolean executable;
+ private final PermanentBlobKey blobKey;
+ private final Path target;
+ private final boolean isDirectory;
+ private final boolean isExecutable;
+ private final JobID jobID;
+ private final PermanentBlobService blobService;
- public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
- this.filePath = new Path(e.filePath);
- this.executable = e.isExecutable;
- this.cachedPath = cachedPath;
+ CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID,
PermanentBlobService blobService, Path target) {
+ try {
+ this.isExecutable = e.isExecutable;
+ this.isDirectory = e.isZipped;
+ this.jobID = jobID;
+ this.blobService = blobService;
+ this.blobKey =
InstantiationUtil.deserializeObject(e.blobKey,
Thread.currentThread().getContextClassLoader());
+ this.target = target;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
public Path call() throws IOException {
- // let exceptions propagate. we can retrieve them later
from
- // the future and report them upon access to the result
- copy(filePath, cachedPath, this.executable);
- return cachedPath;
+ final File file = blobService.getFile(jobID, blobKey);
+
+ if (isDirectory) {
+ try (ZipInputStream zis = new
ZipInputStream(new FileInputStream(file))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) !=
null) {
+ String fileName =
entry.getName();
+ Path newFile = new Path(target,
fileName);
+ if (entry.isDirectory()) {
+
target.getFileSystem().mkdirs(newFile);
+ } else {
+ try (FSDataOutputStream
fsDataOutputStream = target.getFileSystem()
+
.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+
IOUtils.copyBytes(zis, fsDataOutputStream, false);
+ }
+ //noinspection
ResultOfMethodCallIgnored
+ new
File(newFile.getPath()).setExecutable(isExecutable);
+ }
+ zis.closeEntry();
+ }
+ }
+ Files.delete(file.toPath());
+ return target;
+ } else {
+ //noinspection ResultOfMethodCallIgnored
+ file.setExecutable(isExecutable);
+ return Path.fromLocalFile(file);
+ }
+
}
}
/**
* If no task is using this file after 5 seconds, clear it.
*/
- private static class DeleteProcess implements Runnable {
-
- private final Object lock;
- private final Map<JobID, Map<String, Tuple4<Integer, File,
Path, Future<Path>>>> entries;
+ private class DeleteProcess implements Runnable {
- private final String name;
private final JobID jobID;
- public DeleteProcess(Object lock, Map<JobID, Map<String,
Tuple4<Integer, File, Path, Future<Path>>>> entries,
- String name,
JobID jobID) {
- this.lock = lock;
- this.entries = entries;
- this.name = name;
+ DeleteProcess(JobID jobID) {
this.jobID = jobID;
}
@Override
public void run() {
try {
synchronized (lock) {
- Map<String, Tuple4<Integer, File, Path,
Future<Path>>> jobEntries = entries.get(jobID);
- if (jobEntries != null) {
- Tuple4<Integer, File, Path,
Future<Path>> entry = jobEntries.get(name);
+ if (jobRefHolders.get(jobID).isEmpty())
{
+ // abort the copy
+ for (Future<Path> fileFuture :
entries.get(jobID).values()) {
+ fileFuture.cancel(true);
+ }
- if (entry != null) {
- int count = entry.f0;
- if (count > 1) {
- // multiple
references still
- entry.f0 =
count - 1;
- }
- else {
- // we remove
the last reference
-
jobEntries.remove(name);
- if
(jobEntries.isEmpty()) {
-
entries.remove(jobID);
- }
-
- // abort the
copy
-
entry.f3.cancel(true);
-
- // remove the
file
- File file = new
File(entry.f2.toString());
- if
(file.exists()) {
- if
(file.isDirectory()) {
-
FileUtils.deleteDirectory(file);
- }
- else if
(!file.delete()) {
-
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
- }
- }
-
- // remove the
job wide temp directory, if it is now empty
- File parent =
entry.f1;
- if
(parent.isDirectory()) {
-
String[] children = parent.list();
- if
(children == null || children.length == 0) {
-
//noinspection ResultOfMethodCallIgnored
-
parent.delete();
- }
- }
- }
+ // remove the job wide temp
directories
+ for (File storageDirectory :
storageDirectories) {
+ File tempDir = new
File(storageDirectory, jobID.toString());
+
FileUtils.deleteDirectory(tempDir);
}
}
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("Could not delete file from local
file cache.", e);
}
}
}
+
--- End diff --
revert
> Enable shipping custom artifacts to BlobStore and accessing them through
> DistributedCache
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
> Issue Type: New Feature
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we
> can store those files in BlobStore and later on access them in TaskManagers
> through DistributedCache.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)