[ 
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574767#comment-15574767
 ] 

ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------

Github user thodemoor commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/130#discussion_r83379622
  
    --- Diff: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
 ---
    @@ -0,0 +1,699 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.fs.s3a;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import com.amazonaws.AmazonClientException;
    +import com.amazonaws.event.ProgressEvent;
    +import com.amazonaws.event.ProgressEventType;
    +import com.amazonaws.event.ProgressListener;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    +import com.amazonaws.services.s3.model.PartETag;
    +import com.amazonaws.services.s3.model.PutObjectRequest;
    +import com.amazonaws.services.s3.model.PutObjectResult;
    +import com.amazonaws.services.s3.model.UploadPartRequest;
    +import com.google.common.base.Preconditions;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.common.util.concurrent.ListeningExecutorService;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.io.IOUtils;
    +import org.apache.hadoop.io.retry.RetryPolicies;
    +import org.apache.hadoop.io.retry.RetryPolicy;
    +import org.apache.hadoop.util.Progressable;
    +
    +import static org.apache.hadoop.fs.s3a.S3AUtils.*;
    +import static org.apache.hadoop.fs.s3a.Statistic.*;
    +
    +/**
    + * Upload files/parts directly via different buffering mechanisms:
    + * including memory and disk.
    + *
    + * If the stream is closed and no update has started, then the upload
    + * is instead done as a single PUT operation.
    + *
    + * Unstable: statistics and error handling might evolve.
    + */
    +@InterfaceAudience.Private
    +@InterfaceStability.Unstable
    +class S3ABlockOutputStream extends OutputStream {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(S3ABlockOutputStream.class);
    +
    +  /** Owner FileSystem. */
    +  private final S3AFileSystem fs;
    +
    +  /** Object being uploaded. */
    +  private final String key;
    +
    +  /** Size of all blocks. */
    +  private final int blockSize;
    +
    +  /** Callback for progress. */
    +  private final ProgressListener progressListener;
    +  private final ListeningExecutorService executorService;
    +
    +  /**
    +   * Retry policy for multipart commits; not all AWS SDK versions retry 
that.
    +   */
    +  private final RetryPolicy retryPolicy =
    +      RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
    +          5,
    +          2000,
    +          TimeUnit.MILLISECONDS);
    +  /**
    +   * Factory for blocks.
    +   */
    +  private final S3ADataBlocks.BlockFactory blockFactory;
    +
    +  /** Preallocated byte buffer for writing single characters. */
    +  private final byte[] singleCharWrite = new byte[1];
    +
    +  /** Multipart upload details; null means none started. */
    +  private MultiPartUpload multiPartUpload;
    +
    +  /** Closed flag. */
    +  private final AtomicBoolean closed = new AtomicBoolean(false);
    +
    +  /** Current data block. Null means none currently active */
    +  private S3ADataBlocks.DataBlock activeBlock;
    +
    +  /** Count of blocks uploaded. */
    +  private long blockCount = 0;
    +
    +  /** Statistics to build up. */
    +  private final S3AInstrumentation.OutputStreamStatistics statistics;
    +
    +  /**
    +   * Write operation helper; encapsulation of the filesystem operations.
    +   */
    +  private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
    +
    +  /**
    +   * An S3A output stream which uploads partitions in a separate pool of
    +   * threads; different {@link S3ADataBlocks.BlockFactory}
    +   * instances can control where data is buffered.
    +   *
    +   * @param fs S3AFilesystem
    +   * @param key S3 object to work on.
    +   * @param executorService the executor service to use to schedule work
    +   * @param progress report progress in order to prevent timeouts. If
    +   * this object implements {@code ProgressListener} then it will be
    +   * directly wired up to the AWS client, so receive detailed progress
    +   * information.
    +   * @param blockSize size of a single block.
    +   * @param blockFactory factory for creating stream destinations
    +   * @param statistics stats for this stream
    +   * @param writeOperationHelper state of the write operation.
    +   * @throws IOException on any problem
    +   */
    +  S3ABlockOutputStream(S3AFileSystem fs,
    +      String key,
    +      ExecutorService executorService,
    +      Progressable progress,
    +      long blockSize,
    +      S3ADataBlocks.BlockFactory blockFactory,
    +      S3AInstrumentation.OutputStreamStatistics statistics,
    +      S3AFileSystem.WriteOperationHelper writeOperationHelper)
    +      throws IOException {
    +    this.fs = fs;
    +    this.key = key;
    +    this.blockFactory = blockFactory;
    +    this.blockSize = (int) blockSize;
    +    this.statistics = statistics;
    +    this.writeOperationHelper = writeOperationHelper;
    +    Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
    +        "Block size is too small: %d", blockSize);
    +    this.executorService = 
MoreExecutors.listeningDecorator(executorService);
    +    this.multiPartUpload = null;
    +    this.progressListener = (progress instanceof ProgressListener) ?
    +        (ProgressListener) progress
    +        : new ProgressableListener(progress);
    +    // create that first block. This guarantees that an open + close 
sequence
    +    // writes a 0-byte entry.
    +    createBlockIfNeeded();
    +    LOG.debug("Initialized S3ABlockOutputStream for {}" +
    +        " output to {}", writeOperationHelper, activeBlock);
    +  }
    +
    +  /**
    +   * Demand create a destination block.
    +   * @return the active block; null if there isn't one.
    +   * @throws IOException on any failure to create
    +   */
    +  private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
    +      throws IOException {
    +    if (activeBlock == null) {
    +      blockCount++;
    +      activeBlock = blockFactory.create(this.blockSize);
    +    }
    +    return activeBlock;
    +  }
    +
    +  /**
    +   * Synchronized accessor to the active block.
    +   * @return the active block; null if there isn't one.
    +   */
    +  private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
    +    return activeBlock;
    +  }
    +
    +  /**
    +   * Predicate to query whether or not there is an active block.
    +   * @return true if there is an active block.
    +   */
    +  private synchronized boolean hasActiveBlock() {
    +    return activeBlock != null;
    +  }
    +
    +  /**
    +   * Clear the active block.
    +   */
    +  private void clearActiveBlock() {
    +    LOG.debug("Clearing active block");
    +    synchronized (this) {
    +      activeBlock = null;
    +    }
    +  }
    +
    +  /**
    +   * Check for the filesystem being open.
    +   * @throws IOException if the filesystem is closed.
    +   */
    +  void checkOpen() throws IOException {
    +    if (closed.get()) {
    +      throw new IOException("Filesystem " + writeOperationHelper + " 
closed");
    +    }
    +  }
    +
    +  /**
    +   * The flush operation does not trigger an upload; that awaits
    +   * the next block being full. What it does do is call {@code flush() }
    +   * on the current block, leaving it to choose how to react.
    +   * @throws IOException Any IO problem.
    +   */
    +  @Override
    +  public synchronized void flush() throws IOException {
    +    checkOpen();
    +    S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
    +    if (dataBlock != null) {
    +      dataBlock.flush();
    +    }
    +  }
    +
    +  /**
    +   * Writes a byte to the destination. If this causes the buffer to reach
    +   * its limit, the actual upload is submitted to the threadpool.
    +   * @param b the int of which the lowest byte is written
    +   * @throws IOException on any problem
    +   */
    +  @Override
    +  public synchronized void write(int b) throws IOException {
    +    singleCharWrite[0] = (byte)b;
    +    write(singleCharWrite, 0, 1);
    +  }
    +
    +  /**
    +   * Writes a range of bytes from to the memory buffer. If this causes the
    +   * buffer to reach its limit, the actual upload is submitted to the
    +   * threadpool and the remainder of the array is written to memory
    +   * (recursively).
    +   * @param source byte array containing
    +   * @param offset offset in array where to start
    +   * @param len number of bytes to be written
    +   * @throws IOException on any problem
    +   */
    +  @Override
    +  public synchronized void write(byte[] source, int offset, int len)
    +      throws IOException {
    +
    +    S3ADataBlocks.validateWriteArgs(source, offset, len);
    +    checkOpen();
    +    if (len == 0) {
    +      return;
    +    }
    +    S3ADataBlocks.DataBlock block = createBlockIfNeeded();
    +    int written = block.write(source, offset, len);
    +    int remainingCapacity = block.remainingCapacity();
    +    if (written < len) {
    +      // not everything was written —the block has run out
    +      // of capacity
    +      // Trigger an upload then process the remainder.
    +      LOG.debug("writing more data than block has capacity -triggering 
upload");
    +      uploadCurrentBlock();
    +      // tail recursion is mildly expensive, but given buffer sizes must 
be MB.
    --- End diff --
    
    FYI Up to 10k. That's AWS's limit on the number of parts in a single 
multipartupload.


> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
>                 Key: HADOOP-13560
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13560
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 2.9.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>         Attachments: HADOOP-13560-branch-2-001.patch, 
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch, 
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights 
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really 
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very 
> large commit operations for committers using rename



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to