[
https://issues.apache.org/jira/browse/FLINK-9623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519236#comment-16519236
]
ASF GitHub Bot commented on FLINK-9623:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6187#discussion_r197089159
--- Diff:
flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
---
@@ -206,4 +222,54 @@ public boolean filter(Tuple2<Long, Long> value) throws
Exception {
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
}
+
+ @Test
+ public void testArtifactCompression() throws IOException {
+ Path plainFile1 = tmp.newFile("plainFile1").toPath();
+ Path plainFile2 = tmp.newFile("plainFile2").toPath();
+
+ Path directory1 = tmp.newFolder("directory1").toPath();
+ Files.createDirectory(directory1.resolve("containedFile1"));
+
+ Path directory2 = tmp.newFolder("directory2").toPath();
+ Files.createDirectory(directory2.resolve("containedFile2"));
+
+ JobGraph jb = new JobGraph();
+
+ final String executableFileName = "executableFile";
+ final String nonExecutableFileName = "nonExecutableFile";
+ final String executableDirName = "executableDir";
+ final String nonExecutableDirName = "nonExecutableDIr";
+
+ Collection<Tuple2<String,
DistributedCache.DistributedCacheEntry>> originalArtifacts = Arrays.asList(
+ Tuple2.of(executableFileName, new
DistributedCache.DistributedCacheEntry(plainFile1.toString(), true)),
+ Tuple2.of(nonExecutableFileName, new
DistributedCache.DistributedCacheEntry(plainFile2.toString(), false)),
+ Tuple2.of(executableDirName, new
DistributedCache.DistributedCacheEntry(directory1.toString(), true)),
+ Tuple2.of(nonExecutableDirName, new
DistributedCache.DistributedCacheEntry(directory2.toString(), false))
+ );
+
+ JobGraphGenerator.addUserArtifactEntries(originalArtifacts, jb);
+
+ Map<String, DistributedCache.DistributedCacheEntry>
submittedArtifacts = jb.getUserArtifacts();
+
+ DistributedCache.DistributedCacheEntry executableFileEntry =
submittedArtifacts.get(executableFileName);
+ assertState(executableFileEntry, true, false);
+
+ DistributedCache.DistributedCacheEntry nonExecutableFileEntry =
submittedArtifacts.get(nonExecutableFileName);
+ assertState(nonExecutableFileEntry, false, false);
+
+ DistributedCache.DistributedCacheEntry executableDirEntry =
submittedArtifacts.get(executableDirName);
+ assertState(executableDirEntry, true, true);
+
+ DistributedCache.DistributedCacheEntry nonExecutableDirEntry =
submittedArtifacts.get(nonExecutableDirName);
+ assertState(nonExecutableDirEntry, false, true);
+ }
--- End diff --
Should we also check whether the zip file has been created?
> Move zipping logic out of blobservice
> -------------------------------------
>
> Key: FLINK-9623
> URL: https://issues.apache.org/jira/browse/FLINK-9623
> Project: Flink
> Issue Type: Improvement
> Components: Job-Submission
> Affects Versions: 1.6.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Directories given to the blob-service (primarily a use-case for the
> distributed cache) are currently silently zipped, and later unzipped by the
> {{FileCache}}. This tightly coupled the zipping logic in the blob-service to
> the unzipping logic of the {{FileCache}}. The blob-service neither unzipped
> the directory if the blob was requested, nor did it provide any means of
> doing so manually, nor did it inform the user as to whether the requested
> blob is a zip or not.
> My conclusion in is that the blob-service should not support directories _for
> now_, and that instead directories for the {{distributed cache}} should be
> explicitly zipped beforehand, given that this is the only use-case we have at
> the moment.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)