This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cf793009b5f16976efa0698f2d6ca5f4a4139c63 Author: Kostas Kloudas <[email protected]> AuthorDate: Thu Nov 22 10:46:10 2018 +0100 [hotfix] Consolidated all S3 accesses under the S3AccessHelper. --- .../writer/RecoverableMultiPartUploadImpl.java | 2 +- .../flink/fs/s3/common/writer/S3AccessHelper.java | 21 +++++++++--- .../S3RecoverableMultipartUploadFactory.java | 37 ++++++---------------- .../writer/RecoverableMultiPartUploadImplTest.java | 11 +++++-- .../flink/fs/s3hadoop/HadoopS3AccessHelper.java | 25 ++++++++++++++- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java index fe2a4cd..9f0a811 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java @@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload // they do not fall under the user's global TTL on S3. // Figure out a way to clean them. - s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos()); + s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos()); } finally { file.release(); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java index 57920a5..dbc099a 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java @@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.UploadPartResult; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -66,10 +67,9 @@ public interface S3AccessHelper { UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException; /** - * Uploads a part and associates it with the MPU with the provided {@code uploadId}. - * - * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can - * be smaller than the minimum part size imposed by S3. + * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method, + * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection + * policies specified for your S3 bucket. * * @param key the key used to identify this part. * @param file the (local) file holding the data to be uploaded. @@ -77,7 +77,7 @@ public interface S3AccessHelper { * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part. * @throws IOException */ - PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException; + PutObjectResult putObject(String key, InputStream file, long length) throws IOException; /** * Finalizes a Multi-Part Upload. @@ -93,6 +93,17 @@ public interface S3AccessHelper { CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException; /** + * Gets the object associated with the provided {@code key} from S3 and + * puts it in the provided {@code targetLocation}. + * + * @param key the key of the object to fetch. + * @param targetLocation the file to read the object to. + * @return The number of bytes read. + * @throws IOException + */ + long getObject(String key, File targetLocation) throws IOException; + + /** * Fetches the metadata associated with a given key on S3. * * @param key the key. diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java index 9a171ae..ddb09ab 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java @@ -19,10 +19,8 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor; -import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream; import org.apache.flink.fs.s3.common.utils.RefCountedFile; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; @@ -74,7 +72,7 @@ final class S3RecoverableMultipartUploadFactory { } RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException { - final Optional<File> incompletePart = downloadLastDataChunk(recoverable); + final Optional<File> incompletePart = recoverInProgressPart(recoverable); return RecoverableMultiPartUploadImpl.recoverUpload( s3AccessHelper, @@ -86,36 +84,20 @@ final class S3RecoverableMultipartUploadFactory { incompletePart); } - @VisibleForTesting - Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException { + private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException { - final String objectName = recoverable.incompleteObjectName(); - if (objectName == null) { + final String objectKey = recoverable.incompleteObjectName(); + if (objectKey == null) { return Optional.empty(); } // download the file (simple way) - final RefCountedFile fileAndStream = tmpFileSupplier.apply(null); - final File file = fileAndStream.getFile(); - - long numBytes = 0L; - - try ( - final OffsetAwareOutputStream outStream = fileAndStream.getStream(); - final org.apache.hadoop.fs.FSDataInputStream inStream = - fs.open(new org.apache.hadoop.fs.Path('/' + objectName)) - ) { - final byte[] buffer = new byte[32 * 1024]; - - int numRead; - while ((numRead = inStream.read(buffer)) > 0) { - outStream.write(buffer, 0, numRead); - numBytes += numRead; - } - } + final RefCountedFile refCountedFile = tmpFileSupplier.apply(null); + final File file = refCountedFile.getFile(); + final long numBytes = s3AccessHelper.getObject(objectKey, file); // some sanity checks - if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) { + if (numBytes != file.length()) { throw new IOException(String.format("Error recovering writer: " + "Downloading the last data chunk file gives incorrect length. " + "File=%d bytes, Stream=%d bytes", @@ -132,8 +114,7 @@ final class S3RecoverableMultipartUploadFactory { return Optional.of(file); } - @VisibleForTesting - String pathToObjectName(final Path path) { + private String pathToObjectName(final Path path) { org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path); if (!hadoopPath.isAbsolute()) { hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index 4c2f147..a986111 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -347,11 +347,11 @@ public class RecoverableMultiPartUploadImplTest { private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>(); private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>(); - public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() { + List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() { return completePartsUploaded; } - public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() { + List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() { return incompletePartsUploaded; } @@ -367,12 +367,17 @@ public class RecoverableMultiPartUploadImplTest { } @Override - public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException { + public PutObjectResult putObject(String key, InputStream file, long length) throws IOException { final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length)); return storeAndGetPutObjectResult(key, content); } @Override + public long getObject(String key, File targetLocation) throws IOException { + return 0; + } + + @Override public CompleteMultipartUploadResult commitMultiPartUpload( String key, String uploadId, diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java index f833471..473439c 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java @@ -35,8 +35,11 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -72,7 +75,7 @@ public class HadoopS3AccessHelper implements S3AccessHelper { } @Override - public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException { + public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException { final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length); return s3accessHelper.putObject(putRequest); } @@ -83,6 +86,26 @@ public class HadoopS3AccessHelper implements S3AccessHelper { } @Override + public long getObject(String key, File targetLocation) throws IOException { + long numBytes = 0L; + try ( + final OutputStream outStream = new FileOutputStream(targetLocation); + final org.apache.hadoop.fs.FSDataInputStream inStream = + s3a.open(new org.apache.hadoop.fs.Path('/' + key)) + ) { + final byte[] buffer = new byte[32 * 1024]; + + int numRead; + while ((numRead = inStream.read(buffer)) > 0) { + outStream.write(buffer, 0, numRead); + numBytes += numRead; + } + } + + return numBytes; + } + + @Override public ObjectMetadata getObjectMetadata(String key) throws IOException { try { return s3a.getObjectMetadata(new Path('/' + key));
