[
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410990#comment-16410990
]
ASF GitHub Bot commented on FLINK-8620:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r176663553
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
---
@@ -57,89 +65,59 @@
+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs
Land, vom Land\n"
+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten
Wirkung rings umher.\n"
+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des
Donnerschlags. Doch\n"
- + "deine Boten, Herr, verehren Das sanfte Wandeln deines
Tags.\n";
+ + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private FileCache fileCache;
- private File f;
- @Before
- public void setup() throws IOException {
- String[] tmpDirectories = new
String[]{temporaryFolder.newFolder().getAbsolutePath()};
- try {
- fileCache = new FileCache(tmpDirectories);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Cannot create FileCache: " + e.getMessage());
+ private final PermanentBlobKey permanentBlobKey = new
PermanentBlobKey();
+
+ private final PermanentBlobService blobService = new
PermanentBlobService() {
+ @Override
+ public File getFile(JobID jobId, PermanentBlobKey key) throws
IOException {
+ if (key.equals(permanentBlobKey)) {
+ File f = temporaryFolder.newFile("cacheFile");
+ FileUtils.writeFileUtf8(f, testFileContent);
+ return f;
+ } else {
+ throw new IllegalArgumentException("This
service contains only entry for " + permanentBlobKey);
+ }
}
- f = temporaryFolder.newFile("cacheFile");
- try {
- Files.write(testFileContent, f, Charsets.UTF_8);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Error initializing the test: " + e.getMessage());
+ @Override
+ public void close() throws IOException {
+
}
+ };
+
+ @Before
+ public void setup() throws Exception {
+ fileCache = new FileCache(new
String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
}
@After
public void shutdown() {
- try {
- fileCache.shutdown();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("FileCache shutdown failed: " + e.getMessage());
- }
+ fileCache.shutdown();
}
@Test
- public void testFileReuseForNextTask() {
- try {
- final JobID jobID = new JobID();
- final String fileName = "test_file";
-
- final String filePath = f.toURI().toString();
-
- // copy / create the file
- Future<Path> copyResult =
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false),
jobID);
- copyResult.get();
-
- // get another reference to the file
- Future<Path> copyResult2 =
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false),
jobID);
-
- // this should be available immediately
- assertTrue(copyResult2.isDone());
-
- // delete the file
- fileCache.deleteTmpFile(fileName, jobID);
- // file should not yet be deleted
- assertTrue(fileCache.holdsStillReference(fileName,
jobID));
-
- // delete the second reference
- fileCache.deleteTmpFile(fileName, jobID);
- // file should still not be deleted, but remain for a
bit
- assertTrue(fileCache.holdsStillReference(fileName,
jobID));
-
- fileCache.createTmpFile(fileName, new
DistributedCacheEntry(filePath, false), jobID);
- fileCache.deleteTmpFile(fileName, jobID);
-
- // after a while, the file should disappear
- long deadline = System.currentTimeMillis() + 20000;
- do {
- Thread.sleep(5500);
- }
- while (fileCache.holdsStillReference(fileName, jobID)
&& System.currentTimeMillis() < deadline);
-
- assertFalse(fileCache.holdsStillReference(fileName,
jobID));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public void testFileDownloadedFromBlob() throws Exception {
+ JobID jobID = new JobID();
+
+ final String fileName = "test_file";
+ // copy / create the file
+ final DistributedCache.DistributedCacheEntry entry = new
DistributedCache.DistributedCacheEntry(
+ fileName,
+ false,
+ InstantiationUtil.serializeObject(permanentBlobKey));
+ Future<Path> copyResult = fileCache.createTmpFile(fileName,
entry, jobID);
+
+ final Path dstPath = copyResult.get();
+ final String actualContent =
+ StringUtils.join(Files.readLines(new
File(dstPath.toUri()), StandardCharsets.UTF_8), "\n");
--- End diff --
use `Files.readLines(new
File(dstPath.toUri()).collect(Collections.joining("\n")` instead?
> Enable shipping custom artifacts to BlobStore and accessing them through
> DistributedCache
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
> Issue Type: New Feature
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we
> can store those files in BlobStore and later on access them in TaskManagers
> through DistributedCache.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)