[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510982#comment-16510982 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195035751 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -342,4 +320,54 @@ public void run() { } } } + + public static Path compressDirectory(Path directory) throws IOException { + FileSystem fs = directory.getFileSystem(); + java.nio.file.Path tmp = Files.createTempFile("flink-distributed-cache", ".zip"); + try (ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmp.toFile()))) { + addToZip(directory, fs, directory.getParent(), out); + } + return new Path(tmp.toUri()); + } + + private static void addToZip(Path fileOrDirectory, FileSystem fs, Path rootDir, ZipOutputStream out) throws IOException { + String relativePath = fileOrDirectory.getPath().replace(rootDir.getPath() + '/', ""); + if (fs.getFileStatus(fileOrDirectory).isDir()) { + out.putNextEntry(new ZipEntry(relativePath + '/')); + for (FileStatus containedFile : fs.listStatus(fileOrDirectory)) { + addToZip(containedFile.getPath(), fs, rootDir, out); + } + } else { + ZipEntry entry = new ZipEntry(relativePath); + out.putNextEntry(entry); + + try (FSDataInputStream in = fs.open(fileOrDirectory)) { + IOUtils.copyBytes(in, out, false); + } + out.closeEntry(); + } + } + + @VisibleForTesting + static Path expandDirectory(File file, File targetDirectory, boolean isExecutable) throws IOException { + java.nio.file.Path rootDir = null; + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + java.nio.file.Path relativePath = Paths.get(entry.getName()); + rootDir = relativePath.getName(0); + + java.nio.file.Path newFile = targetDirectory.toPath().resolve(relativePath); + if (entry.isDirectory()) { + Files.createDirectories(newFile); + } else { + Files.copy(zis, newFile); + //noinspection ResultOfMethodCallIgnored + newFile.toFile().setExecutable(isExecutable); --- End diff -- This method violates the SRP by expanding the zip and setting file permissions. Might be easier to separate these steps (especially if we move these methods to `FileUtils`). > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)