This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf793009b5f16976efa0698f2d6ca5f4a4139c63
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Nov 22 10:46:10 2018 +0100

    [hotfix] Consolidated all S3 accesses under the S3AccessHelper.
---
 .../writer/RecoverableMultiPartUploadImpl.java     |  2 +-
 .../flink/fs/s3/common/writer/S3AccessHelper.java  | 21 +++++++++---
 .../S3RecoverableMultipartUploadFactory.java       | 37 ++++++----------------
 .../writer/RecoverableMultiPartUploadImplTest.java | 11 +++++--
 .../flink/fs/s3hadoop/HadoopS3AccessHelper.java    | 25 ++++++++++++++-
 5 files changed, 58 insertions(+), 38 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index fe2a4cd..9f0a811 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                        // they do not fall under the user's global TTL on S3.
                        // Figure out a way to clean them.
 
-                       
s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, 
file.getPos());
+                       s3AccessHelper.putObject(incompletePartObjectName, 
inputStream, file.getPos());
                }
                finally {
                        file.release();
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index 57920a5..dbc099a 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.UploadPartResult;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -66,10 +67,9 @@ public interface S3AccessHelper {
        UploadPartResult uploadPart(String key, String uploadId, int 
partNumber, InputStream file, long length) throws IOException;
 
        /**
-        * Uploads a part and associates it with the MPU with the provided 
{@code uploadId}.
-        *
-        * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, 
long)}, this part can
-        * be smaller than the minimum part size imposed by S3.
+        * Uploads an object to S3. Contrary to the {@link #uploadPart(String, 
String, int, InputStream, long)} method,
+        * this object is not going to be associated to any MPU and, as such, 
it is not subject to the garbage collection
+        * policies specified for your S3 bucket.
         *
         * @param key the key used to identify this part.
         * @param file the (local) file holding the data to be uploaded.
@@ -77,7 +77,7 @@ public interface S3AccessHelper {
         * @return The {@link PutObjectResult result} of the attempt to stage 
the incomplete part.
         * @throws IOException
         */
-       PutObjectResult uploadIncompletePart(String key, InputStream file, long 
length) throws IOException;
+       PutObjectResult putObject(String key, InputStream file, long length) 
throws IOException;
 
        /**
         * Finalizes a Multi-Part Upload.
@@ -93,6 +93,17 @@ public interface S3AccessHelper {
        CompleteMultipartUploadResult commitMultiPartUpload(String key, String 
uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) 
throws IOException;
 
        /**
+        * Gets the object associated with the provided {@code key} from S3 and
+        * puts it in the provided {@code targetLocation}.
+        *
+        * @param key the key of the object to fetch.
+        * @param targetLocation the file to read the object to.
+        * @return The number of bytes read.
+        * @throws IOException
+        */
+       long getObject(String key, File targetLocation) throws IOException;
+
+       /**
         * Fetches the metadata associated with a given key on S3.
         *
         * @param key the key.
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 9a171ae..ddb09ab 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,10 +19,8 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -74,7 +72,7 @@ final class S3RecoverableMultipartUploadFactory {
        }
 
        RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable 
recoverable) throws IOException {
-               final Optional<File> incompletePart = 
downloadLastDataChunk(recoverable);
+               final Optional<File> incompletePart = 
recoverInProgressPart(recoverable);
 
                return RecoverableMultiPartUploadImpl.recoverUpload(
                                s3AccessHelper,
@@ -86,36 +84,20 @@ final class S3RecoverableMultipartUploadFactory {
                                incompletePart);
        }
 
-       @VisibleForTesting
-       Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws 
IOException {
+       private Optional<File> recoverInProgressPart(S3Recoverable recoverable) 
throws IOException {
 
-               final String objectName = recoverable.incompleteObjectName();
-               if (objectName == null) {
+               final String objectKey = recoverable.incompleteObjectName();
+               if (objectKey == null) {
                        return Optional.empty();
                }
 
                // download the file (simple way)
-               final RefCountedFile fileAndStream = 
tmpFileSupplier.apply(null);
-               final File file = fileAndStream.getFile();
-
-               long numBytes = 0L;
-
-               try (
-                               final OffsetAwareOutputStream outStream = 
fileAndStream.getStream();
-                               final org.apache.hadoop.fs.FSDataInputStream 
inStream =
-                                               fs.open(new 
org.apache.hadoop.fs.Path('/' + objectName))
-               ) {
-                       final byte[] buffer = new byte[32 * 1024];
-
-                       int numRead;
-                       while ((numRead = inStream.read(buffer)) > 0) {
-                               outStream.write(buffer, 0, numRead);
-                               numBytes += numRead;
-                       }
-               }
+               final RefCountedFile refCountedFile = 
tmpFileSupplier.apply(null);
+               final File file = refCountedFile.getFile();
+               final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
                // some sanity checks
-               if (numBytes != file.length() || numBytes != 
fileAndStream.getStream().getLength()) {
+               if (numBytes != file.length()) {
                        throw new IOException(String.format("Error recovering 
writer: " +
                                                        "Downloading the last 
data chunk file gives incorrect length. " +
                                                        "File=%d bytes, 
Stream=%d bytes",
@@ -132,8 +114,7 @@ final class S3RecoverableMultipartUploadFactory {
                return Optional.of(file);
        }
 
-       @VisibleForTesting
-       String pathToObjectName(final Path path) {
+       private String pathToObjectName(final Path path) {
                org.apache.hadoop.fs.Path hadoopPath = 
HadoopFileSystem.toHadoopPath(path);
                if (!hadoopPath.isAbsolute()) {
                        hadoopPath = new 
org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 4c2f147..a986111 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -347,11 +347,11 @@ public class RecoverableMultiPartUploadImplTest {
                private final 
List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> 
completePartsUploaded = new ArrayList<>();
                private final 
List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> 
incompletePartsUploaded = new ArrayList<>();
 
-               public 
List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> 
getCompletePartsUploaded() {
+               List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> 
getCompletePartsUploaded() {
                        return completePartsUploaded;
                }
 
-               public 
List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> 
getIncompletePartsUploaded() {
+               List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> 
getIncompletePartsUploaded() {
                        return incompletePartsUploaded;
                }
 
@@ -367,12 +367,17 @@ public class RecoverableMultiPartUploadImplTest {
                }
 
                @Override
-               public PutObjectResult uploadIncompletePart(String key, 
InputStream file, long length) throws IOException {
+               public PutObjectResult putObject(String key, InputStream file, 
long length) throws IOException {
                        final byte[] content = getFileContentBytes(file, 
MathUtils.checkedDownCast(length));
                        return storeAndGetPutObjectResult(key, content);
                }
 
                @Override
+               public long getObject(String key, File targetLocation) throws 
IOException {
+                       return 0;
+               }
+
+               @Override
                public CompleteMultipartUploadResult commitMultiPartUpload(
                                String key,
                                String uploadId,
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f833471..473439c 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -72,7 +75,7 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
        }
 
        @Override
-       public PutObjectResult uploadIncompletePart(String key, InputStream 
inputStream, long length) throws IOException {
+       public PutObjectResult putObject(String key, InputStream inputStream, 
long length) throws IOException {
                final PutObjectRequest putRequest = 
s3accessHelper.createPutObjectRequest(key, inputStream, length);
                return s3accessHelper.putObject(putRequest);
        }
@@ -83,6 +86,26 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
        }
 
        @Override
+       public long getObject(String key, File targetLocation) throws 
IOException {
+               long numBytes = 0L;
+               try (
+                               final OutputStream outStream = new 
FileOutputStream(targetLocation);
+                               final org.apache.hadoop.fs.FSDataInputStream 
inStream =
+                                               s3a.open(new 
org.apache.hadoop.fs.Path('/' + key))
+               ) {
+                       final byte[] buffer = new byte[32 * 1024];
+
+                       int numRead;
+                       while ((numRead = inStream.read(buffer)) > 0) {
+                               outStream.write(buffer, 0, numRead);
+                               numBytes += numRead;
+                       }
+               }
+
+               return numBytes;
+       }
+
+       @Override
        public ObjectMetadata getObjectMetadata(String key) throws IOException {
                try {
                        return s3a.getObjectMetadata(new Path('/' + key));

Reply via email to