mukund-thakur commented on code in PR #6938:
URL: https://github.com/apache/hadoop/pull/6938#discussion_r1692281548


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ContentStreamProvider;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.store.ByteBufferInputStream;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static 
org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
+
+/**
+ * Implementations of {@code 
software.amazon.awssdk.http.ContentStreamProvider}.
+ * <p>
+ * These are required to ensure that retry of multipart uploads are reliable,
+ * while also avoiding memory copy/consumption overhead.
+ * <p>
+ * For these reasons the providers built in to the AWS SDK are not used.
+ * <p>
+ * See HADOOP-19221 for details.
+ */
+public final class UploadContentProviders {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(UploadContentProviders.class);
+
+  private UploadContentProviders() {
+  }
+
+  /**
+   * Create a content provider from a file.
+   * @param file file to read.
+   * @param offset offset in file.
+   * @param size of data.
+   * @return the provider
+   * @throws IllegalArgumentException if the offset is negative.
+   */
+  public static BaseContentProvider<BufferedInputStream> fileContentProvider(
+      File file,
+      long offset,
+      final int size) {
+
+    return new FileWithOffsetContentProvider(file, offset, size);
+  }
+
+  /**
+   * Create a content provider from a byte buffer.
+   * The buffer is not copied and MUST NOT be modified while
+   * the upload is taking place.
+   * @param byteBuffer buffer to read.
+   * @param size size of the data.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null
+   */
+  public static BaseContentProvider<ByteBufferInputStream> 
byteBufferContentProvider(
+      final ByteBuffer byteBuffer,
+      final int size) {
+
+    return new ByteBufferContentProvider(byteBuffer, size);
+  }
+
+  /**
+   * Create a content provider for all or part of a byte array.
+   * The buffer is not copied and MUST NOT be modified while
+   * the upload is taking place.
+   * @param bytes buffer to read.
+   * @param offset offset in buffer.
+   * @param size size of the data.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null.
+   */
+  public static BaseContentProvider<ByteArrayInputStream> 
byteArrayContentProvider(
+      final byte[] bytes, final int offset, final int size) {
+    return new ByteArrayContentProvider(bytes, offset, size);
+  }
+
+  /**
+   * Create a content provider for all of a byte array.
+   * @param bytes buffer to read.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null.
+   */
+  public static BaseContentProvider<ByteArrayInputStream> 
byteArrayContentProvider(
+      final byte[] bytes) {
+    return byteArrayContentProvider(bytes, 0, bytes.length);
+  }
+
+  /**
+   * Base class for content providers; tracks the number of times a stream
+   * has been opened.
+   * @param <T> type of stream created.
+   */
+  @VisibleForTesting
+  public static abstract class BaseContentProvider<T extends InputStream>
+      implements ContentStreamProvider, Closeable {
+
+    /**
+     * Size of the data.
+     */
+    private final int size;
+
+    /**
+     * How many times has a stream been created?
+     */
+    private int streamCreationCount;
+
+    /**
+     * Current stream. Null if not opened yet.
+     * When {@link #newStream()} is called, this is set to the new value,
+     * Note: when the input stream itself is closed, this reference is not 
updated.
+     * Therefore this field not being null does not imply that the stream is 
open.
+     */
+    private T currentStream;
+
+    /**
+     * Constructor.
+     * @param size size of the data. Must be non-negative.
+     */
+    protected BaseContentProvider(final int size) {
+      checkArgument(size >= 0, "size is negative: %s", size);
+      this.size = size;
+    }
+
+    /**
+     * Close the current stream.
+     */
+    @Override
+    public void close() {
+      cleanupWithLogger(LOG, getCurrentStream());
+      setCurrentStream(null);
+    }
+
+    /**
+     * Note that a stream was created.
+     * <p>
+     * Logs if this is a subsequent event as it implies a failure of the first 
attempt.
+     * @return the new stream
+     */
+    @Override
+    public final InputStream newStream() {
+      close();
+      streamCreationCount++;
+      if (streamCreationCount > 1) {
+        LOG.info("Stream created more than once: {}", this);
+      }
+      return setCurrentStream(createNewStream());
+    }
+
+    /**
+     * Override point for subclasses to create their new streams.
+     * @return a stream
+     */
+    protected abstract T createNewStream();
+
+    /**
+     * How many times has a stream been created?
+     * @return stream creation count
+     */
+    public int getStreamCreationCount() {
+      return streamCreationCount;
+    }
+
+    /**
+     * Size as set by constructor parameter.
+     * @return size of the data
+     */
+    public int getSize() {
+      return size;
+    }
+
+    /**
+     * Current stream.
+     * When {@link #newStream()} is called, this is set to the new value,
+     * after closing the previous one.
+     * <p>
+     * Why? The AWS SDK implementations do this, so there
+     * is an implication that it is needed to avoid keeping streams
+     * open on retries.
+     * @return the current stream, or null if none is open.
+     */
+    protected T getCurrentStream() {
+      return currentStream;
+    }
+
+    /**
+     * Set the current stream.
+     * @param stream the new stream
+     * @return the current stream.
+     */
+    protected T setCurrentStream(T stream) {
+      this.currentStream = stream;
+      return stream;
+    }
+
+    @Override
+    public String toString() {
+      return "BaseContentProvider{" +
+          "size=" + size +
+          ", streamCreationCount=" + streamCreationCount +
+          ", currentStream=" + currentStream +
+          '}';
+    }
+  }
+
+  /**
+   * Content provider for a file with an offset.
+   */
+  private static final class FileWithOffsetContentProvider
+      extends BaseContentProvider<BufferedInputStream> {
+
+    /**
+     * File to read.
+     */
+    private final File file;
+
+    /**
+     * Offset in file.
+     */
+    private final long offset;
+
+    /**
+     * Constructor.
+     * @param file file to read.
+     * @param offset offset in file.
+     * @param size of data.
+     * @throws IllegalArgumentException if the offset is negative.
+     */
+    private FileWithOffsetContentProvider(final File file,
+        final long offset,
+        final int size) {
+
+      super(size);
+      this.file = requireNonNull(file);
+      checkArgument(offset >= 0, "Offset is negative: %s", offset);
+      this.offset = offset;
+    }
+
+    /**
+     * Create a new stream.
+     * @return a stream at the start of the offset in the file
+     * @throws UncheckedIOException on IO failure.
+     */
+    @Override
+    protected BufferedInputStream createNewStream() throws 
UncheckedIOException {
+      // create the stream, seek to the offset.
+      final FileInputStream fis = uncheckIOExceptions(() -> {
+        final FileInputStream f = new FileInputStream(file);
+        f.getChannel().position(offset);
+        return f;
+      });
+      return setCurrentStream(new BufferedInputStream(fis));
+    }
+
+    @Override
+    public String toString() {
+      return "FileWithOffsetContentProvider{" +
+          "file=" + file +
+          ", offset=" + offset +
+          "} " + super.toString();
+    }
+
+  }
+
+  /**
+   * Create a content provider for a byte buffer.
+   * Uses {@link ByteBufferInputStream} to read the data.
+   */
+  private static final class ByteBufferContentProvider
+      extends BaseContentProvider<ByteBufferInputStream> {
+
+    /**
+     * The buffer which will be read; on or off heap.
+     */
+    private final ByteBuffer blockBuffer;
+
+    /**
+     * The position in the buffer at the time the provider was created.
+     */
+    private final int initialPosition;
+
+    /**
+     * Constructor.
+     * @param blockBuffer buffer to read.
+     * @param size size of the data.
+     * @throws IllegalArgumentException if the arguments are invalid.
+     * @throws NullPointerException if the buffer is null
+     */
+    private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) {
+      super(size);
+      this.blockBuffer = blockBuffer;
+      this.initialPosition = blockBuffer.position();
+    }
+
+    @Override
+    protected ByteBufferInputStream createNewStream() {
+      // set the buffer up from reading from the beginning
+      blockBuffer.limit(initialPosition);

Review Comment:
   Wondering why setting the limit is important?



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md:
##########
@@ -501,7 +543,43 @@ endpoint and region like the following:
   <value>${sts.region}</value>
 </property>
 ```
+## <a name="500_internal_error"></a> HTTP 500 status code "We encountered an 
internal error"
+
+```
+We encountered an internal error. Please try again.
+(Service: S3, Status Code: 500, Request ID: <id>, Extended Request ID: 
<extended-id>)
+```
+
+The [status code 
500](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500) indicates
+the S3 store has reported an internal problem.
+When raised by Amazon S3, we believe this is a rare sign of a problem within 
the S3 system
+or another part of the cloud infrastructure on which it depends.
+Retrying _should_ make it go away.
+
+The 500 error is considered retryable by the AWS SDK, which will have already
+tried it `fs.s3a.attempts.maximum` times before reaching the S3A client -which

Review Comment:
   nit: retried?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to