Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6187#discussion_r197089159
--- Diff:
flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
---
@@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws
Exception {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String,
DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new
DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new
DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new
DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new
DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry>
submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry =
submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry =
submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry =
submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry =
submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
--- End diff --
Should we also check whether the zip file has been created?
---