SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789168709
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
.bucket(location.bucket())
.key(location.key());
+ if (isEtagCheckEnabled) {
+
requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+ }
+
S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
S3RequestUtil.configurePermission(awsProperties, requestBuilder);
- s3.putObject(requestBuilder.build(),
RequestBody.fromInputStream(contentStream, contentLength));
+ PutObjectRequest putObjectRequest = requestBuilder.build();
+ try {
+ s3.putObject(putObjectRequest,
RequestBody.fromInputStream(contentStream, contentLength));
+ } catch (UncheckedIOException uncheckedIOException) {
+ logS3RequestAndThrow(uncheckedIOException,
putObjectRequest.toString());
+ }
} else {
uploadParts();
completeMultiPartUpload();
}
}
+ private void closeStream() throws IOException {
+ if (isEtagCheckEnabled) {
+ if (stagingFiles.size() > 0) {
+ stagingFiles.get(stagingFiles.size() -
1).setDigest(currentPartMessageDigest.digest());
+ }
+ }
+ stream.close();
+ }
+
+ private static void logS3RequestAndThrow(UncheckedIOException
uncheckedIOException, String requestString) {
+ LOG.error("S3 Request Failure: {}", requestString);
+ throw uncheckedIOException;
Review comment:
Makes sense, I can let the callers through the exceptions directly.
@jackye1995 for logging message, the sensitive part of request like
`SSECustomerKey` is already redacted. I also feel just seeing random exception
like ProtocolException without much information only makes thing confusing for
end users.
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -172,15 +187,23 @@ public void write(byte[] b, int off, int len) throws
IOException {
private void newStream() throws IOException {
if (stream != null) {
- stream.close();
+ closeStream();
}
createStagingDirectoryIfNotExists();
currentStagingFile = File.createTempFile("s3fileio-", ".tmp",
stagingDirectory);
currentStagingFile.deleteOnExit();
- stagingFiles.add(currentStagingFile);
- stream = new CountingOutputStream(new BufferedOutputStream(new
FileOutputStream(currentStagingFile)));
+ stagingFiles.add(new FileAndDigest(currentStagingFile));
+
+ if (isEtagCheckEnabled) {
+ currentPartMessageDigest.reset();
Review comment:
This is what I had before, but changed that to instead keep bytes as per
@rdblue comment.
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
LOG.warn("Unclosed output stream created by:\n\t{}", trace);
}
}
+
+ private static class FileAndDigest {
+ private final File file;
+ private byte[] digest;
+ private boolean isDigestSet = false;
+
+ FileAndDigest(File file) {
+ this.file = file;
+ }
+
+ File file() {
+ return file;
+ }
+
+ void setDigest(byte[] digest) {
Review comment:
It needs to be mutable if we are not passing MessageDigest. Earlier I
had it mutable as I was passing MessageDigest, which I was able to update while
bytes are written. File is created in newStream, but digest bytes are available
later when stream is closed.
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
stagingFiles.stream()
// do not upload the file currently being written
- .filter(f -> closed || !f.equals(currentStagingFile))
+ .filter(f -> closed || !f.file().equals(currentStagingFile))
// do not upload any files that have already been processed
- .filter(Predicates.not(multiPartMap::containsKey))
- .forEach(f -> {
+ .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+ .forEach(fileAndDigest -> {
+ File file = fileAndDigest.file();
Review comment:
using `f` would lead to `LocalVariableName` checkstyle failure. Previous
discussion here
https://github.com/apache/iceberg/pull/3813#discussion_r788252447
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]