dawidwys commented on a change in pull request #7161: 
[FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of 
in-progress files.
URL: https://github.com/apache/flink/pull/7161#discussion_r238276552
 
 

 ##########
 File path: 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 ##########
 @@ -86,41 +84,17 @@ RecoverableMultiPartUpload 
recoverRecoverableUpload(S3Recoverable recoverable) t
                                incompletePart);
        }
 
-       @VisibleForTesting
-       Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws 
IOException {
+       private Optional<File> recoverInProgressPart(S3Recoverable recoverable) 
throws IOException {
 
-               final String objectName = recoverable.incompleteObjectName();
-               if (objectName == null) {
+               final String objectKey = recoverable.incompleteObjectName();
+               if (objectKey == null) {
                        return Optional.empty();
                }
 
                // download the file (simple way)
-               final RefCountedFile fileAndStream = 
tmpFileSupplier.apply(null);
-               final File file = fileAndStream.getFile();
-
-               long numBytes = 0L;
-
-               try (
-                               final OffsetAwareOutputStream outStream = 
fileAndStream.getStream();
-                               final org.apache.hadoop.fs.FSDataInputStream 
inStream =
-                                               fs.open(new 
org.apache.hadoop.fs.Path('/' + objectName))
-               ) {
-                       final byte[] buffer = new byte[32 * 1024];
-
-                       int numRead;
-                       while ((numRead = inStream.read(buffer)) > 0) {
-                               outStream.write(buffer, 0, numRead);
-                               numBytes += numRead;
-                       }
-               }
-
-               // some sanity checks
-               if (numBytes != file.length() || numBytes != 
fileAndStream.getStream().getLength()) {
-                       throw new IOException(String.format("Error recovering 
writer: " +
-                                                       "Downloading the last 
data chunk file gives incorrect length. " +
-                                                       "File=%d bytes, 
Stream=%d bytes",
-                                       file.length(), numBytes));
-               }
+               final RefCountedFile refCountedFile = 
tmpFileSupplier.apply(null);
+               final File file = refCountedFile.getFile();
+               final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
 Review comment:
   I don't fully understand the difference, but you've changed the stream you 
use to download the file from S3, previously you wrote through 
`fileAndStream.getStream`, now you write through `new 
FileOutputStream(fileAndStream.getFile())` Is there any difference? Is it ok to 
change it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to