Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2739#discussion_r198983226
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---
    @@ -59,57 +59,47 @@ protected LocallyCachedBlob(String blobDescription, 
String blobKey) {
             this.blobKey = blobKey;
         }
     
    -    protected static long downloadToTempLocation(ClientBlobStore store, 
String key, long currentVersion, IAdvancedFSOps fsOps,
    -                                                 Function<Long, Path> 
getTempPath)
    -        throws KeyNotFoundException, AuthorizationException, IOException {
    +    /**
    +     * Helper function to download blob from blob store.
    +     * @param store Blob store to fetch blobs from
    +     * @param key Key to retrieve blobs
    +     * @param pathSupplier A function that supplies the download 
destination of a blob. It guarantees the validity
    +     *                     of path or throws {@link IOException}
    +     * @param outStreamSupplier A function that supplies the {@link 
OutputStream} object
    +     * @return The metadata of the download session, including blob's 
version and download destination
    +     * @throws KeyNotFoundException Thrown if key to retrieve blob is 
invalid
    +     * @throws AuthorizationException Thrown if the retrieval is not under 
security authorization
    +     * @throws IOException Thrown if any IO error occurs
    +     */
    +    protected DownloadMeta fetch(ClientBlobStore store, String key,
    +                                 IOFunction<Long, Path> pathSupplier,
    +                                 IOFunction<File, OutputStream> 
outStreamSupplier)
    +            throws KeyNotFoundException, AuthorizationException, 
IOException {
    +
             try (InputStreamWithMeta in = store.getBlob(key)) {
                 long newVersion = in.getVersion();
    -            if (newVersion == currentVersion) {
    -                LOG.warn("The version did not change, but going to 
download again {} {}", currentVersion, key);
    +            if (newVersion == getLocalVersion()) {
    +                LOG.warn("The version did not change, but going to 
download again {} {}", getLocalVersion(), key);
                 }
    -            Path tmpLocation = getTempPath.apply(newVersion);
    -            long totalRead = 0;
    +
                 //Make sure the parent directory is there and ready to go
    -            fsOps.forceMkdir(tmpLocation.getParent());
    -            try (OutputStream outStream = 
fsOps.getOutputStream(tmpLocation.toFile())) {
    +            Path downloadPath = pathSupplier.apply(newVersion);
    +            LOG.debug("Downloading {} to {}", key, downloadPath);
    +
    +            long totalRead = 0;
    +            try (OutputStream out = 
outStreamSupplier.apply(downloadPath.toFile())) {
                     byte[] buffer = new byte[4096];
    -                int read = 0;
    -                while ((read = in.read(buffer)) > 0) {
    -                    outStream.write(buffer, 0, read);
    +                int read;
    +                while ((read = in.read(buffer)) >= 0) {
    +                    out.write(buffer, 0, read);
                         totalRead += read;
                     }
                 }
                 long expectedSize = in.getFileLength();
                 if (totalRead != expectedSize) {
    --- End diff --
    
    It was mostly about where we were seeing errors at the time.


---

Reply via email to