[
https://issues.apache.org/jira/browse/HADOOP-19221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868828#comment-17868828
]
ASF GitHub Bot commented on HADOOP-19221:
-----------------------------------------
mukund-thakur commented on code in PR #6938:
URL: https://github.com/apache/hadoop/pull/6938#discussion_r1692281548
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ContentStreamProvider;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.store.ByteBufferInputStream;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static
org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
+
+/**
+ * Implementations of {@code
software.amazon.awssdk.http.ContentStreamProvider}.
+ * <p>
+ * These are required to ensure that retry of multipart uploads are reliable,
+ * while also avoiding memory copy/consumption overhead.
+ * <p>
+ * For these reasons the providers built in to the AWS SDK are not used.
+ * <p>
+ * See HADOOP-19221 for details.
+ */
+public final class UploadContentProviders {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(UploadContentProviders.class);
+
+ private UploadContentProviders() {
+ }
+
+ /**
+ * Create a content provider from a file.
+ * @param file file to read.
+ * @param offset offset in file.
+ * @param size of data.
+ * @return the provider
+ * @throws IllegalArgumentException if the offset is negative.
+ */
+ public static BaseContentProvider<BufferedInputStream> fileContentProvider(
+ File file,
+ long offset,
+ final int size) {
+
+ return new FileWithOffsetContentProvider(file, offset, size);
+ }
+
+ /**
+ * Create a content provider from a byte buffer.
+ * The buffer is not copied and MUST NOT be modified while
+ * the upload is taking place.
+ * @param byteBuffer buffer to read.
+ * @param size size of the data.
+ * @return the provider
+ * @throws IllegalArgumentException if the arguments are invalid.
+ * @throws NullPointerException if the buffer is null
+ */
+ public static BaseContentProvider<ByteBufferInputStream>
byteBufferContentProvider(
+ final ByteBuffer byteBuffer,
+ final int size) {
+
+ return new ByteBufferContentProvider(byteBuffer, size);
+ }
+
+ /**
+ * Create a content provider for all or part of a byte array.
+ * The buffer is not copied and MUST NOT be modified while
+ * the upload is taking place.
+ * @param bytes buffer to read.
+ * @param offset offset in buffer.
+ * @param size size of the data.
+ * @return the provider
+ * @throws IllegalArgumentException if the arguments are invalid.
+ * @throws NullPointerException if the buffer is null.
+ */
+ public static BaseContentProvider<ByteArrayInputStream>
byteArrayContentProvider(
+ final byte[] bytes, final int offset, final int size) {
+ return new ByteArrayContentProvider(bytes, offset, size);
+ }
+
+ /**
+ * Create a content provider for all of a byte array.
+ * @param bytes buffer to read.
+ * @return the provider
+ * @throws IllegalArgumentException if the arguments are invalid.
+ * @throws NullPointerException if the buffer is null.
+ */
+ public static BaseContentProvider<ByteArrayInputStream>
byteArrayContentProvider(
+ final byte[] bytes) {
+ return byteArrayContentProvider(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Base class for content providers; tracks the number of times a stream
+ * has been opened.
+ * @param <T> type of stream created.
+ */
+ @VisibleForTesting
+ public static abstract class BaseContentProvider<T extends InputStream>
+ implements ContentStreamProvider, Closeable {
+
+ /**
+ * Size of the data.
+ */
+ private final int size;
+
+ /**
+ * How many times has a stream been created?
+ */
+ private int streamCreationCount;
+
+ /**
+ * Current stream. Null if not opened yet.
+ * When {@link #newStream()} is called, this is set to the new value,
+ * Note: when the input stream itself is closed, this reference is not
updated.
+ * Therefore this field not being null does not imply that the stream is
open.
+ */
+ private T currentStream;
+
+ /**
+ * Constructor.
+ * @param size size of the data. Must be non-negative.
+ */
+ protected BaseContentProvider(final int size) {
+ checkArgument(size >= 0, "size is negative: %s", size);
+ this.size = size;
+ }
+
+ /**
+ * Close the current stream.
+ */
+ @Override
+ public void close() {
+ cleanupWithLogger(LOG, getCurrentStream());
+ setCurrentStream(null);
+ }
+
+ /**
+ * Note that a stream was created.
+ * <p>
+ * Logs if this is a subsequent event as it implies a failure of the first
attempt.
+ * @return the new stream
+ */
+ @Override
+ public final InputStream newStream() {
+ close();
+ streamCreationCount++;
+ if (streamCreationCount > 1) {
+ LOG.info("Stream created more than once: {}", this);
+ }
+ return setCurrentStream(createNewStream());
+ }
+
+ /**
+ * Override point for subclasses to create their new streams.
+ * @return a stream
+ */
+ protected abstract T createNewStream();
+
+ /**
+ * How many times has a stream been created?
+ * @return stream creation count
+ */
+ public int getStreamCreationCount() {
+ return streamCreationCount;
+ }
+
+ /**
+ * Size as set by constructor parameter.
+ * @return size of the data
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Current stream.
+ * When {@link #newStream()} is called, this is set to the new value,
+ * after closing the previous one.
+ * <p>
+ * Why? The AWS SDK implementations do this, so there
+ * is an implication that it is needed to avoid keeping streams
+ * open on retries.
+ * @return the current stream, or null if none is open.
+ */
+ protected T getCurrentStream() {
+ return currentStream;
+ }
+
+ /**
+ * Set the current stream.
+ * @param stream the new stream
+ * @return the current stream.
+ */
+ protected T setCurrentStream(T stream) {
+ this.currentStream = stream;
+ return stream;
+ }
+
+ @Override
+ public String toString() {
+ return "BaseContentProvider{" +
+ "size=" + size +
+ ", streamCreationCount=" + streamCreationCount +
+ ", currentStream=" + currentStream +
+ '}';
+ }
+ }
+
+ /**
+ * Content provider for a file with an offset.
+ */
+ private static final class FileWithOffsetContentProvider
+ extends BaseContentProvider<BufferedInputStream> {
+
+ /**
+ * File to read.
+ */
+ private final File file;
+
+ /**
+ * Offset in file.
+ */
+ private final long offset;
+
+ /**
+ * Constructor.
+ * @param file file to read.
+ * @param offset offset in file.
+ * @param size of data.
+ * @throws IllegalArgumentException if the offset is negative.
+ */
+ private FileWithOffsetContentProvider(final File file,
+ final long offset,
+ final int size) {
+
+ super(size);
+ this.file = requireNonNull(file);
+ checkArgument(offset >= 0, "Offset is negative: %s", offset);
+ this.offset = offset;
+ }
+
+ /**
+ * Create a new stream.
+ * @return a stream at the start of the offset in the file
+ * @throws UncheckedIOException on IO failure.
+ */
+ @Override
+ protected BufferedInputStream createNewStream() throws
UncheckedIOException {
+ // create the stream, seek to the offset.
+ final FileInputStream fis = uncheckIOExceptions(() -> {
+ final FileInputStream f = new FileInputStream(file);
+ f.getChannel().position(offset);
+ return f;
+ });
+ return setCurrentStream(new BufferedInputStream(fis));
+ }
+
+ @Override
+ public String toString() {
+ return "FileWithOffsetContentProvider{" +
+ "file=" + file +
+ ", offset=" + offset +
+ "} " + super.toString();
+ }
+
+ }
+
+ /**
+ * Create a content provider for a byte buffer.
+ * Uses {@link ByteBufferInputStream} to read the data.
+ */
+ private static final class ByteBufferContentProvider
+ extends BaseContentProvider<ByteBufferInputStream> {
+
+ /**
+ * The buffer which will be read; on or off heap.
+ */
+ private final ByteBuffer blockBuffer;
+
+ /**
+ * The position in the buffer at the time the provider was created.
+ */
+ private final int initialPosition;
+
+ /**
+ * Constructor.
+ * @param blockBuffer buffer to read.
+ * @param size size of the data.
+ * @throws IllegalArgumentException if the arguments are invalid.
+ * @throws NullPointerException if the buffer is null
+ */
+ private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) {
+ super(size);
+ this.blockBuffer = blockBuffer;
+ this.initialPosition = blockBuffer.position();
+ }
+
+ @Override
+ protected ByteBufferInputStream createNewStream() {
+ // set the buffer up from reading from the beginning
+ blockBuffer.limit(initialPosition);
Review Comment:
Wondering why setting the limit is important?
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md:
##########
@@ -501,7 +543,43 @@ endpoint and region like the following:
<value>${sts.region}</value>
</property>
```
+## <a name="500_internal_error"></a> HTTP 500 status code "We encountered an
internal error"
+
+```
+We encountered an internal error. Please try again.
+(Service: S3, Status Code: 500, Request ID: <id>, Extended Request ID:
<extended-id>)
+```
+
+The [status code
500](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500) indicates
+the S3 store has reported an internal problem.
+When raised by Amazon S3, we believe this is a rare sign of a problem within
the S3 system
+or another part of the cloud infrastructure on which it depends.
+Retrying _should_ make it go away.
+
+The 500 error is considered retryable by the AWS SDK, which will have already
+tried it `fs.s3a.attempts.maximum` times before reaching the S3A client -which
Review Comment:
nit: retried?
> S3A: Unable to recover from failure of multipart block upload attempt "Status
> Code: 400; Error Code: RequestTimeout"
> --------------------------------------------------------------------------------------------------------------------
>
> Key: HADOOP-19221
> URL: https://issues.apache.org/jira/browse/HADOOP-19221
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> If a multipart PUT request fails for some reason (e.g. networrk error) then
> all subsequent retry attempts fail with a 400 Response and ErrorCode
> RequestTimeout .
> {code}
> Your socket connection to the server was not read from or written to within
> the timeout period. Idle connections will be closed. (Service: Amazon S3;
> Status Code: 400; Error Code: RequestTimeout; Request ID:; S3 Extended
> Request ID:
> {code}
> The list of supporessed exceptions contains the root cause (the initial
> failure was a 500); all retries failed to upload properly from the source
> input stream {{RequestBody.fromInputStream(fileStream, size)}}.
> Hypothesis: the mark/reset stuff doesn't work for input streams. On the v1
> sdk we would build a multipart block upload request passing in (file, offset,
> length), the way we are now doing this doesn't recover.
> probably fixable by providing our own {{ContentStreamProvider}}
> implementations for
> # file + offset + length
> # bytebuffer
> # byte array
> The sdk does have explicit support for the memory ones, but they copy the
> data blocks first. we don't want that as it would double the memory
> requirements of active blocks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]