[
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575312#comment-15575312
]
ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------
Github user thodemoor commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/130#discussion_r83419562
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
---
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
+/**
+ * Upload files/parts directly via different buffering mechanisms:
+ * including memory and disk.
+ *
+ * If the stream is closed and no update has started, then the upload
+ * is instead done as a single PUT operation.
+ *
+ * Unstable: statistics and error handling might evolve.
+ */
[email protected]
[email protected]
+class S3ABlockOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ABlockOutputStream.class);
+
+ /** Owner FileSystem. */
+ private final S3AFileSystem fs;
+
+ /** Object being uploaded. */
+ private final String key;
+
+ /** Size of all blocks. */
+ private final int blockSize;
+
+ /** Callback for progress. */
+ private final ProgressListener progressListener;
+ private final ListeningExecutorService executorService;
+
+ /**
+ * Retry policy for multipart commits; not all AWS SDK versions retry
that.
+ */
+ private final RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
+ 5,
+ 2000,
+ TimeUnit.MILLISECONDS);
+ /**
+ * Factory for blocks.
+ */
+ private final S3ADataBlocks.BlockFactory blockFactory;
+
+ /** Preallocated byte buffer for writing single characters. */
+ private final byte[] singleCharWrite = new byte[1];
+
+ /** Multipart upload details; null means none started. */
+ private MultiPartUpload multiPartUpload;
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /** Current data block. Null means none currently active */
+ private S3ADataBlocks.DataBlock activeBlock;
+
+ /** Count of blocks uploaded. */
+ private long blockCount = 0;
+
+ /** Statistics to build up. */
+ private final S3AInstrumentation.OutputStreamStatistics statistics;
+
+ /**
+ * Write operation helper; encapsulation of the filesystem operations.
+ */
+ private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+
+ /**
+ * An S3A output stream which uploads partitions in a separate pool of
+ * threads; different {@link S3ADataBlocks.BlockFactory}
+ * instances can control where data is buffered.
+ *
+ * @param fs S3AFilesystem
+ * @param key S3 object to work on.
+ * @param executorService the executor service to use to schedule work
+ * @param progress report progress in order to prevent timeouts. If
+ * this object implements {@code ProgressListener} then it will be
+ * directly wired up to the AWS client, so receive detailed progress
+ * information.
+ * @param blockSize size of a single block.
+ * @param blockFactory factory for creating stream destinations
+ * @param statistics stats for this stream
+ * @param writeOperationHelper state of the write operation.
+ * @throws IOException on any problem
+ */
+ S3ABlockOutputStream(S3AFileSystem fs,
+ String key,
+ ExecutorService executorService,
+ Progressable progress,
+ long blockSize,
+ S3ADataBlocks.BlockFactory blockFactory,
+ S3AInstrumentation.OutputStreamStatistics statistics,
+ S3AFileSystem.WriteOperationHelper writeOperationHelper)
+ throws IOException {
+ this.fs = fs;
+ this.key = key;
+ this.blockFactory = blockFactory;
+ this.blockSize = (int) blockSize;
+ this.statistics = statistics;
+ this.writeOperationHelper = writeOperationHelper;
+ Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+ "Block size is too small: %d", blockSize);
+ this.executorService =
MoreExecutors.listeningDecorator(executorService);
+ this.multiPartUpload = null;
+ this.progressListener = (progress instanceof ProgressListener) ?
+ (ProgressListener) progress
+ : new ProgressableListener(progress);
+ // create that first block. This guarantees that an open + close
sequence
+ // writes a 0-byte entry.
+ createBlockIfNeeded();
+ LOG.debug("Initialized S3ABlockOutputStream for {}" +
+ " output to {}", writeOperationHelper, activeBlock);
+ }
+
+ /**
+ * Demand create a destination block.
+ * @return the active block; null if there isn't one.
+ * @throws IOException on any failure to create
+ */
+ private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
+ throws IOException {
+ if (activeBlock == null) {
+ blockCount++;
+ activeBlock = blockFactory.create(this.blockSize);
+ }
+ return activeBlock;
+ }
+
+ /**
+ * Synchronized accessor to the active block.
+ * @return the active block; null if there isn't one.
+ */
+ private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
+ return activeBlock;
+ }
+
+ /**
+ * Predicate to query whether or not there is an active block.
+ * @return true if there is an active block.
+ */
+ private synchronized boolean hasActiveBlock() {
+ return activeBlock != null;
+ }
+
+ /**
+ * Clear the active block.
+ */
+ private void clearActiveBlock() {
+ LOG.debug("Clearing active block");
+ synchronized (this) {
+ activeBlock = null;
+ }
+ }
+
+ /**
+ * Check for the filesystem being open.
+ * @throws IOException if the filesystem is closed.
+ */
+ void checkOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Filesystem " + writeOperationHelper + "
closed");
+ }
+ }
+
+ /**
+ * The flush operation does not trigger an upload; that awaits
+ * the next block being full. What it does do is call {@code flush() }
+ * on the current block, leaving it to choose how to react.
+ * @throws IOException Any IO problem.
+ */
+ @Override
+ public synchronized void flush() throws IOException {
+ checkOpen();
+ S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
+ if (dataBlock != null) {
+ dataBlock.flush();
+ }
+ }
+
+ /**
+ * Writes a byte to the destination. If this causes the buffer to reach
+ * its limit, the actual upload is submitted to the threadpool.
+ * @param b the int of which the lowest byte is written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(int b) throws IOException {
+ singleCharWrite[0] = (byte)b;
+ write(singleCharWrite, 0, 1);
+ }
+
+ /**
+ * Writes a range of bytes from to the memory buffer. If this causes the
+ * buffer to reach its limit, the actual upload is submitted to the
+ * threadpool and the remainder of the array is written to memory
+ * (recursively).
+ * @param source byte array containing
+ * @param offset offset in array where to start
+ * @param len number of bytes to be written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(byte[] source, int offset, int len)
+ throws IOException {
+
+ S3ADataBlocks.validateWriteArgs(source, offset, len);
+ checkOpen();
+ if (len == 0) {
+ return;
+ }
+ S3ADataBlocks.DataBlock block = createBlockIfNeeded();
+ int written = block.write(source, offset, len);
+ int remainingCapacity = block.remainingCapacity();
+ if (written < len) {
+ // not everything was written —the block has run out
+ // of capacity
+ // Trigger an upload then process the remainder.
+ LOG.debug("writing more data than block has capacity -triggering
upload");
+ uploadCurrentBlock();
+ // tail recursion is mildly expensive, but given buffer sizes must
be MB.
--- End diff --
We can . With the min part size of 5MB you need a 50GB upload to test this.
Will take a while vs. AWS. We can test this cheaply, but of course vs our
S3-clone, but at least that will test the log @ error.
@pieterreuse please add this to our testplan
> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
> Key: HADOOP-13560
> URL: https://issues.apache.org/jira/browse/HADOOP-13560
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 2.9.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Attachments: HADOOP-13560-branch-2-001.patch,
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch,
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very
> large commit operations for committers using rename
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]