[ https://issues.apache.org/jira/browse/FLINK-9623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519236#comment-16519236 ]
ASF GitHub Bot commented on FLINK-9623: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6187#discussion_r197089159 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java --- @@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws Exception { assertTrue(sinkVertex.getPreferredResources().equals(resource6)); assertTrue(iterationSyncVertex.getMinResources().equals(resource3)); } + + @Test + public void testArtifactCompression() throws IOException { + Path plainFile1 = tmp.newFile("plainFile1").toPath(); + Path plainFile2 = tmp.newFile("plainFile2").toPath(); + + Path directory1 = tmp.newFolder("directory1").toPath(); + Files.createDirectory(directory1.resolve("containedFile1")); + + Path directory2 = tmp.newFolder("directory2").toPath(); + Files.createDirectory(directory2.resolve("containedFile2")); + + JobGraph jb = new JobGraph(); + + final String executableFileName = "executableFile"; + final String nonExecutableFileName = "nonExecutableFile"; + final String executableDirName = "executableDir"; + final String nonExecutableDirName = "nonExecutableDIr"; + + Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList( + Tuple2.of(executableFileName, new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)), + Tuple2.of(nonExecutableFileName, new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)), + Tuple2.of(executableDirName, new DistributedCache.DistributedCacheEntry(directory1.toString(), true)), + Tuple2.of(nonExecutableDirName, new DistributedCache.DistributedCacheEntry(directory2.toString(), false)) + ); + + JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb); + + Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts = jb.getUserArtifacts(); + + DistributedCache.DistributedCacheEntry executableFileEntry = submittedArtifacts.get(executableFileName); + assertState(executableFileEntry, true, false); + + DistributedCache.DistributedCacheEntry nonExecutableFileEntry = submittedArtifacts.get(nonExecutableFileName); + assertState(nonExecutableFileEntry, false, false); + + DistributedCache.DistributedCacheEntry executableDirEntry = submittedArtifacts.get(executableDirName); + assertState(executableDirEntry, true, true); + + DistributedCache.DistributedCacheEntry nonExecutableDirEntry = submittedArtifacts.get(nonExecutableDirName); + assertState(nonExecutableDirEntry, false, true); + } --- End diff -- Should we also check whether the zip file has been created? > Move zipping logic out of blobservice > ------------------------------------- > > Key: FLINK-9623 > URL: https://issues.apache.org/jira/browse/FLINK-9623 > Project: Flink > Issue Type: Improvement > Components: Job-Submission > Affects Versions: 1.6.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Directories given to the blob-service (primarily a use-case for the > distributed cache) are currently silently zipped, and later unzipped by the > {{FileCache}}. This tightly coupled the zipping logic in the blob-service to > the unzipping logic of the {{FileCache}}. The blob-service neither unzipped > the directory if the blob was requested, nor did it provide any means of > doing so manually, nor did it inform the user as to whether the requested > blob is a zip or not. > My conclusion in is that the blob-service should not support directories _for > now_, and that instead directories for the {{distributed cache}} should be > explicitly zipped beforehand, given that this is the only use-case we have at > the moment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)