Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6187#discussion_r197112144
  
    --- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
 ---
    @@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws 
Exception {
                
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
                
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
        }
    +
    +   @Test
    +   public void testArtifactCompression() throws IOException {
    +           Path plainFile1 = tmp.newFile("plainFile1").toPath();
    +           Path plainFile2 = tmp.newFile("plainFile2").toPath();
    +
    +           Path directory1 = tmp.newFolder("directory1").toPath();
    +           Files.createDirectory(directory1.resolve("containedFile1"));
    +
    +           Path directory2 = tmp.newFolder("directory2").toPath();
    +           Files.createDirectory(directory2.resolve("containedFile2"));
    +
    +           JobGraph jb = new JobGraph();
    +
    +           final String executableFileName = "executableFile";
    +           final String nonExecutableFileName = "nonExecutableFile";
    +           final String executableDirName = "executableDir";
    +           final String nonExecutableDirName = "nonExecutableDIr";
    +
    +           Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
    +                   Tuple2.of(executableFileName, new 
DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
    +                   Tuple2.of(nonExecutableFileName, new 
DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
    +                   Tuple2.of(executableDirName, new 
DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
    +                   Tuple2.of(nonExecutableDirName, new 
DistributedCache.DistributedCacheEntry(directory2.toString(), false))
    +           );
    +
    +           JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
    +
    +           Map<String, DistributedCache.DistributedCacheEntry> 
submittedArtifacts = jb.getUserArtifacts();
    +
    +           DistributedCache.DistributedCacheEntry executableFileEntry = 
submittedArtifacts.get(executableFileName);
    +           assertState(executableFileEntry, true, false);
    +
    +           DistributedCache.DistributedCacheEntry nonExecutableFileEntry = 
submittedArtifacts.get(nonExecutableFileName);
    +           assertState(nonExecutableFileEntry, false, false);
    +
    +           DistributedCache.DistributedCacheEntry executableDirEntry = 
submittedArtifacts.get(executableDirName);
    +           assertState(executableDirEntry, true, true);
    +
    +           DistributedCache.DistributedCacheEntry nonExecutableDirEntry = 
submittedArtifacts.get(nonExecutableDirName);
    +           assertState(nonExecutableDirEntry, false, true);
    +   }
    --- End diff --
    
    that's implicitly checked by 
assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());), which 
throws an exception for a non existing file.


---

Reply via email to