Hey team,

I’m developing a system where we are trying to sink to an immutable s3
bucket as part of a flink job. This bucket has server side encryption set
to KMS. The DataStream sink works perfectly fine when I don’t use the
immutable bucket but when I use an immutable bucket, I get exceptions
regarding multipart upload failures. It says we need to enable md5 hashing
for the put object to work.

According to was s3 documentation for immutable buckets (with object locks)
they say it’s mandatory to have a content-md5 header -
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
“The Content-MD5 header is required for any request to upload an object
with a retention period configured using Amazon S3 Object Lock. For more
information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview
<https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> in
the *Amazon Simple Storage Service Developer Guide*."

My question to the link team was -  "How do I set this HTTP header while
sinking? I checked most of the documentation and tried going through the
source code too but couldn’t really find a provision where we could set the
headers for a request that goes in as a sink." And they got back asking me
to set fs.s3a.etag.checksum.enabled: true. But that didn’t work either. And
then they redirected me to the Hadoop team. Can you please help me out here?

The one thing I’m unclear about is how does the system know that we’re
using md5 hashing when we enable checksum? Is there some way to specify
that? Feels like I’m missing that.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 common frames omitted
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5
HTTP header is required for Put Part requests with Object Lock parameters
(Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request
ID: xxxxx; S3 Extended Request ID: xxxx)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
at
com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 12 common frames omitted

Thanks!

Regards,
Nikita

Reply via email to