[
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410985#comment-16410985
]
ASF GitHub Bot commented on FLINK-8620:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r176660974
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
* Any additional configuration for the blob client
* @param jobId
* ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
- * @param jars
- * List of JAR files to upload
+ * @param files
+ * List of files to upload
*
* @throws IOException
* if the upload fails
*/
- public static List<PermanentBlobKey> uploadJarFiles(
- InetSocketAddress serverAddress, Configuration
clientConfig, JobID jobId, List<Path> jars)
+ public static List<PermanentBlobKey> uploadFiles(
+ InetSocketAddress serverAddress, Configuration
clientConfig, JobID jobId, List<Path> files)
throws IOException {
checkNotNull(jobId);
- if (jars.isEmpty()) {
+ if (files.isEmpty()) {
return Collections.emptyList();
} else {
List<PermanentBlobKey> blobKeys = new ArrayList<>();
try (BlobClient blobClient = new
BlobClient(serverAddress, clientConfig)) {
- for (final Path jar : jars) {
- final FileSystem fs =
jar.getFileSystem();
- FSDataInputStream is = null;
- try {
- is = fs.open(jar);
- final PermanentBlobKey key =
- (PermanentBlobKey)
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
- blobKeys.add(key);
- } finally {
- if (is != null) {
- is.close();
- }
- }
+ for (final Path file : files) {
+ final PermanentBlobKey key =
blobClient.uploadFile(jobId, file);
+ blobKeys.add(key);
}
}
return blobKeys;
}
}
- //
--------------------------------------------------------------------------------------------
- // Miscellaneous
- //
--------------------------------------------------------------------------------------------
-
- private static Throwable readExceptionFromStream(InputStream in) throws
IOException {
- int len = readLength(in);
- byte[] bytes = new byte[len];
- readFully(in, bytes, 0, len, "Error message");
+ /**
+ * Uploads a single file to the {@link PermanentBlobService} of the
given {@link BlobServer}.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param file
+ * file to upload
+ *
+ * @throws IOException
+ * if the upload fails
+ */
+ public PermanentBlobKey uploadFile(JobID jobId, Path file) throws
IOException {
+ final FileSystem fs = file.getFileSystem();
+ if (fs.getFileStatus(file).isDir()) {
+ return uploadDirectory(jobId, file, fs);
+ } else {
+ try (InputStream is = fs.open(file)) {
+ return (PermanentBlobKey) putInputStream(jobId,
is, PERMANENT_BLOB);
+ }
+ }
+ }
- try {
- return (Throwable)
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+ private PermanentBlobKey uploadDirectory(JobID jobId, Path file,
FileSystem fs) throws IOException {
+ try (BlobOutputStream blobOutputStream = new
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+ try (ZipOutputStream zipStream = new
ZipOutputStream(blobOutputStream)) {
+ compressDirectoryToZipfile(fs,
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+ zipStream.finish();
+ return (PermanentBlobKey)
blobOutputStream.finish();
+ }
}
- catch (ClassNotFoundException e) {
- // should never occur
- throw new IOException("Could not transfer error
message", e);
+ }
+
+ private static void compressDirectoryToZipfile(FileSystem fs,
FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws
IOException {
+ for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
+ LOG.info("Zipping file: {}", file);
+ if (file.isDir()) {
+ compressDirectoryToZipfile(fs, rootDir, file,
out);
+ } else {
+ String entryName =
file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
--- End diff --
coudln't you used `Path#getName()`?
> Enable shipping custom artifacts to BlobStore and accessing them through
> DistributedCache
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
> Issue Type: New Feature
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we
> can store those files in BlobStore and later on access them in TaskManagers
> through DistributedCache.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)