There's a broader issue here which is "S3A support for buckets with Object Lock"
I suspect that there will be more to this than just setting an MD5 checksum on file uploads. AFAIK, nobody has done this/is working on the feature; there's so many other issues than nobody is going to do it unless it is critical for them: https://issues.apache.org/jira/browse/HADOOP-16829 Given that you are the one with this problem, I'm afraid that leaves you. We'll take a patch with docs and tests; submission process is: https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/testing.html steve On Fri, 17 Jul 2020 at 21:10, nikita Balakrishnan <nikitabal...@gmail.com> wrote: > Hey team, > > I’m developing a system where we are trying to sink to an immutable s3 > bucket as part of a flink job. This bucket has server side encryption set > to KMS. The DataStream sink works perfectly fine when I don’t use the > immutable bucket but when I use an immutable bucket, I get exceptions > regarding multipart upload failures. It says we need to enable md5 hashing > for the put object to work. > > According to was s3 documentation for immutable buckets (with object locks) > they say it’s mandatory to have a content-md5 header - > https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html > “The Content-MD5 header is required for any request to upload an object > with a retention period configured using Amazon S3 Object Lock. For more > information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview > <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> > in > the *Amazon Simple Storage Service Developer Guide*." > > My question to the link team was - "How do I set this HTTP header while > sinking? I checked most of the documentation and tried going through the > source code too but couldn’t really find a provision where we could set the > headers for a request that goes in as a sink." And they got back asking me > to set fs.s3a.etag.checksum.enabled: true. But that didn’t work either. And > then they redirected me to the Hadoop team. Can you please help me out > here? > > The one thing I’m unclear about is how does the system know that we’re > using md5 hashing when we enable checksum? Is there some way to specify > that? Feels like I’m missing that. > > Here’s the stack trace: > > org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught > exception while processing timer. > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: > java.io.IOException: Uploading parts failed > ... 11 common frames omitted > Caused by: java.io.IOException: Uploading parts failed > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56) > at > > org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518) > ... 10 common frames omitted > Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on > raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt: > com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header > is required for Put Part requests with Object Lock parameters (Service: > Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx; > S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx > :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests > with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error > Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx) > at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212) > at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) > at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) > at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) > at > > org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) > at > > org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) > at > > org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318) > at > > org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) > at > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 common frames omitted > Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 > HTTP header is required for Put Part requests with Object Lock parameters > (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request > ID: xxxxx; S3 Extended Request ID: xxxx) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > > com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) > at > > com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) > at > > org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) > at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > ... 12 common frames omitted > > Thanks! > > Regards, > Nikita >