[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510982#comment-16510982
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195035751
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
@@ -342,4 +320,54 @@ public void run() {
}
}
}
+
+ public static Path compressDirectory(Path directory) throws IOException
{
+ FileSystem fs = directory.getFileSystem();
+ java.nio.file.Path tmp =
Files.createTempFile("flink-distributed-cache", ".zip");
+ try (ZipOutputStream out = new ZipOutputStream(new
FileOutputStream(tmp.toFile()))) {
+ addToZip(directory, fs, directory.getParent(), out);
+ }
+ return new Path(tmp.toUri());
+ }
+
+ private static void addToZip(Path fileOrDirectory, FileSystem fs, Path
rootDir, ZipOutputStream out) throws IOException {
+ String relativePath =
fileOrDirectory.getPath().replace(rootDir.getPath() + '/', "");
+ if (fs.getFileStatus(fileOrDirectory).isDir()) {
+ out.putNextEntry(new ZipEntry(relativePath + '/'));
+ for (FileStatus containedFile :
fs.listStatus(fileOrDirectory)) {
+ addToZip(containedFile.getPath(), fs, rootDir,
out);
+ }
+ } else {
+ ZipEntry entry = new ZipEntry(relativePath);
+ out.putNextEntry(entry);
+
+ try (FSDataInputStream in = fs.open(fileOrDirectory)) {
+ IOUtils.copyBytes(in, out, false);
+ }
+ out.closeEntry();
+ }
+ }
+
+ @VisibleForTesting
+ static Path expandDirectory(File file, File targetDirectory, boolean
isExecutable) throws IOException {
+ java.nio.file.Path rootDir = null;
+ try (ZipInputStream zis = new ZipInputStream(new
FileInputStream(file))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ java.nio.file.Path relativePath =
Paths.get(entry.getName());
+ rootDir = relativePath.getName(0);
+
+ java.nio.file.Path newFile =
targetDirectory.toPath().resolve(relativePath);
+ if (entry.isDirectory()) {
+ Files.createDirectories(newFile);
+ } else {
+ Files.copy(zis, newFile);
+ //noinspection ResultOfMethodCallIgnored
+
newFile.toFile().setExecutable(isExecutable);
--- End diff --
This method violates the SRP by expanding the zip and setting file
permissions. Might be easier to separate these steps (especially if we move
these methods to `FileUtils`).
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)