[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410990#comment-16410990
 ] 

ASF GitHub Bot commented on FLINK-8620:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176663553
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
    @@ -57,89 +65,59 @@
                + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
                + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
                + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
    -           + "deine Boten, Herr, verehren Das sanfte Wandeln deines 
Tags.\n";
    +           + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
     
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
     
        private FileCache fileCache;
    -   private File f;
     
    -   @Before
    -   public void setup() throws IOException {
    -           String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
    -           try {
    -                   fileCache = new FileCache(tmpDirectories);
    -           }
    -           catch (Exception e) {
    -                   e.printStackTrace();
    -                   fail("Cannot create FileCache: " + e.getMessage());
    +   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
    +
    +   private final PermanentBlobService blobService = new 
PermanentBlobService() {
    +           @Override
    +           public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
    +                   if (key.equals(permanentBlobKey)) {
    +                           File f = temporaryFolder.newFile("cacheFile");
    +                           FileUtils.writeFileUtf8(f, testFileContent);
    +                           return f;
    +                   } else {
    +                           throw new IllegalArgumentException("This 
service contains only entry for " + permanentBlobKey);
    +                   }
                }
     
    -           f = temporaryFolder.newFile("cacheFile");
    -           try {
    -                   Files.write(testFileContent, f, Charsets.UTF_8);
    -           }
    -           catch (Exception e) {
    -                   e.printStackTrace();
    -                   fail("Error initializing the test: " + e.getMessage());
    +           @Override
    +           public void close() throws IOException {
    +
                }
    +   };
    +
    +   @Before
    +   public void setup() throws Exception {
    +           fileCache = new FileCache(new 
String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
        }
     
        @After
        public void shutdown() {
    -           try {
    -                   fileCache.shutdown();
    -           }
    -           catch (Exception e) {
    -                   e.printStackTrace();
    -                   fail("FileCache shutdown failed: " + e.getMessage());
    -           }
    +           fileCache.shutdown();
        }
     
        @Test
    -   public void testFileReuseForNextTask() {
    -           try {
    -                   final JobID jobID = new JobID();
    -                   final String fileName = "test_file";
    -
    -                   final String filePath = f.toURI().toString();
    -
    -                   // copy / create the file
    -                   Future<Path> copyResult = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
    -                   copyResult.get();
    -
    -                   // get another reference to the file
    -                   Future<Path> copyResult2 = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
    -
    -                   // this should be available immediately
    -                   assertTrue(copyResult2.isDone());
    -
    -                   // delete the file
    -                   fileCache.deleteTmpFile(fileName, jobID);
    -                   // file should not yet be deleted
    -                   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
    -
    -                   // delete the second reference
    -                   fileCache.deleteTmpFile(fileName, jobID);
    -                   // file should still not be deleted, but remain for a 
bit
    -                   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
    -
    -                   fileCache.createTmpFile(fileName, new 
DistributedCacheEntry(filePath, false), jobID);
    -                   fileCache.deleteTmpFile(fileName, jobID);
    -
    -                   // after a while, the file should disappear
    -                   long deadline = System.currentTimeMillis() + 20000;
    -                   do {
    -                           Thread.sleep(5500);
    -                   }
    -                   while (fileCache.holdsStillReference(fileName, jobID) 
&& System.currentTimeMillis() < deadline);
    -
    -                   assertFalse(fileCache.holdsStillReference(fileName, 
jobID));
    -           }
    -           catch (Exception e) {
    -                   e.printStackTrace();
    -                   fail(e.getMessage());
    -           }
    +   public void testFileDownloadedFromBlob() throws Exception {
    +           JobID jobID = new JobID();
    +
    +           final String fileName = "test_file";
    +           // copy / create the file
    +           final DistributedCache.DistributedCacheEntry entry = new 
DistributedCache.DistributedCacheEntry(
    +                   fileName,
    +                   false,
    +                   InstantiationUtil.serializeObject(permanentBlobKey));
    +           Future<Path> copyResult = fileCache.createTmpFile(fileName, 
entry, jobID);
    +
    +           final Path dstPath = copyResult.get();
    +           final String actualContent =
    +                   StringUtils.join(Files.readLines(new 
File(dstPath.toUri()), StandardCharsets.UTF_8), "\n");
    --- End diff --
    
    use `Files.readLines(new 
File(dstPath.toUri()).collect(Collections.joining("\n")` instead?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8620
>                 URL: https://issues.apache.org/jira/browse/FLINK-8620
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to