http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index da1fc5a..ef5a434 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MetricsRegistry; @@ -122,8 +127,23 @@ public class S3AInstrumentation { STREAM_WRITE_BLOCK_UPLOADS_ABORTED, STREAM_WRITE_TOTAL_TIME, STREAM_WRITE_TOTAL_DATA, + COMMITTER_COMMITS_CREATED, + COMMITTER_COMMITS_COMPLETED, + COMMITTER_JOBS_SUCCEEDED, + COMMITTER_JOBS_FAILED, + COMMITTER_TASKS_SUCCEEDED, + COMMITTER_TASKS_FAILED, + COMMITTER_BYTES_COMMITTED, + COMMITTER_BYTES_UPLOADED, + COMMITTER_COMMITS_FAILED, + COMMITTER_COMMITS_ABORTED, + COMMITTER_COMMITS_REVERTED, + COMMITTER_MAGIC_FILES_CREATED, S3GUARD_METADATASTORE_PUT_PATH_REQUEST, - S3GUARD_METADATASTORE_INITIALIZATION + S3GUARD_METADATASTORE_INITIALIZATION, + S3GUARD_METADATASTORE_RETRY, + S3GUARD_METADATASTORE_THROTTLED, + STORE_IO_THROTTLED }; @@ -179,8 +199,11 @@ public class S3AInstrumentation { gauge(statistic.getSymbol(), statistic.getDescription()); } //todo need a config for the quantiles interval? + int interval = 1; quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, - "ops", "latency", 1); + "ops", "latency", interval); + quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + "events", "frequency (Hz)", interval); } /** @@ -372,7 +395,7 @@ public class S3AInstrumentation { } /** - * Indicate that S3A deleted one or more file.s + * Indicate that S3A deleted one or more files. * @param count number of files. */ public void fileDeleted(int count) { @@ -506,6 +529,14 @@ public class S3AInstrumentation { } /** + * Create a new instance of the committer statistics. + * @return a new committer statistics instance + */ + CommitterStatistics newCommitterStatistics() { + return new CommitterStatistics(); + } + + /** * Merge in the statistics of a single input stream into * the filesystem-wide statistics. * @param statistics stream statistics @@ -584,9 +615,12 @@ public class S3AInstrumentation { /** * The inner stream was opened. + * @return the previous count */ - public void streamOpened() { + public long streamOpened() { + long count = openOperations; openOperations++; + return count; } /** @@ -810,10 +844,13 @@ public class S3AInstrumentation { } /** - * Note an exception in a multipart complete. + * Note exception in a multipart complete. + * @param count count of exceptions */ - void exceptionInMultipartComplete() { - exceptionsInMultipartFinalize.incrementAndGet(); + void exceptionInMultipartComplete(int count) { + if (count > 0) { + exceptionsInMultipartFinalize.addAndGet(count); + } } /** @@ -832,6 +869,15 @@ public class S3AInstrumentation { } /** + * Data has been uploaded to be committed in a subsequent operation; + * to be called at the end of the write. + * @param size size in bytes + */ + public void commitUploaded(long size) { + incrementCounter(COMMITTER_BYTES_UPLOADED, size); + } + + /** * Output stream has closed. * Trigger merge in of all statistics not updated during operation. */ @@ -918,5 +964,176 @@ public class S3AInstrumentation { public void storeClosed() { } + + /** + * Throttled request. + */ + public void throttled() { + incrementCounter(S3GUARD_METADATASTORE_THROTTLED, 1); + addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1); + } + + /** + * S3Guard is retrying after a (retryable) failure. + */ + public void retrying() { + incrementCounter(S3GUARD_METADATASTORE_RETRY, 1); + } + } + + /** + * Instrumentation exported to S3Guard Committers. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public final class CommitterStatistics { + + /** A commit has been created. */ + public void commitCreated() { + incrementCounter(COMMITTER_COMMITS_CREATED, 1); + } + + /** + * Data has been uploaded to be committed in a subsequent operation. + * @param size size in bytes + */ + public void commitUploaded(long size) { + incrementCounter(COMMITTER_BYTES_UPLOADED, size); + } + + /** + * A commit has been completed. + * @param size size in bytes + */ + public void commitCompleted(long size) { + incrementCounter(COMMITTER_COMMITS_COMPLETED, 1); + incrementCounter(COMMITTER_BYTES_COMMITTED, size); + } + + /** A commit has been aborted. */ + public void commitAborted() { + incrementCounter(COMMITTER_COMMITS_ABORTED, 1); + } + + public void commitReverted() { + incrementCounter(COMMITTER_COMMITS_REVERTED, 1); + } + + public void commitFailed() { + incrementCounter(COMMITTER_COMMITS_FAILED, 1); + } + + public void taskCompleted(boolean success) { + incrementCounter( + success ? COMMITTER_TASKS_SUCCEEDED + : COMMITTER_TASKS_FAILED, + 1); + } + + public void jobCompleted(boolean success) { + incrementCounter( + success ? COMMITTER_JOBS_SUCCEEDED + : COMMITTER_JOBS_FAILED, + 1); + } + } + + /** + * Copy all the metrics to a map of (name, long-value). + * @return a map of the metrics + */ + public Map<String, Long> toMap() { + MetricsToMap metricBuilder = new MetricsToMap(null); + registry.snapshot(metricBuilder, true); + for (Map.Entry<String, MutableCounterLong> entry : + streamMetrics.entrySet()) { + metricBuilder.tuple(entry.getKey(), entry.getValue().value()); + } + return metricBuilder.getMap(); + } + + /** + * Convert all metrics to a map. + */ + private static class MetricsToMap extends MetricsRecordBuilder { + private final MetricsCollector parent; + private final Map<String, Long> map = + new HashMap<>(COUNTERS_TO_CREATE.length * 2); + + MetricsToMap(MetricsCollector parent) { + this.parent = parent; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return tuple(info, value); + } + + public MetricsToMap tuple(MetricsInfo info, long value) { + return tuple(info.name(), value); + } + + public MetricsToMap tuple(String name, long value) { + map.put(name, value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return tuple(info, (long) value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return tuple(info, (long) value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + /** + * Get the map. + * @return the map of metrics + */ + public Map<String, Long> getMap() { + return map; + } } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java new file mode 100644 index 0000000..e37a554 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.file.AccessDeniedException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; +import com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.ConnectTimeoutException; + +import static org.apache.hadoop.io.retry.RetryPolicies.*; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * The S3A request retry policy. + * + * This uses the retry options in the configuration file to determine retry + * count and delays for "normal" retries and separately, for throttling; + * the latter is best handled for longer with an exponential back-off. + * + * <ol> + * <li> Those exceptions considered unrecoverable (networking) are + * failed fast.</li> + * <li>All non-IOEs are failed immediately. Assumed: bugs in code, + * unrecoverable errors, etc</li> + * </ol> + * + * For non-idempotent operations, only failures due to throttling or + * from failures which are known to only arise prior to talking to S3 + * are retried. + * + * The retry policy is all built around that of the normal IO exceptions, + * particularly those extracted from + * {@link S3AUtils#translateException(String, Path, AmazonClientException)}. + * Because the {@link #shouldRetry(Exception, int, int, boolean)} method + * does this translation if an {@code AmazonClientException} is processed, + * the policy defined for the IOEs also applies to the original exceptions. + * + * Put differently: this retry policy aims to work for handlers of the + * untranslated exceptions, as well as the translated ones. + * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html">S3 Error responses</a> + * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html">Amazon S3 Error Best Practices</a> + * @see <a href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html">Dynamo DB Commmon errors</a> + */ +public class S3ARetryPolicy implements RetryPolicy { + + private final RetryPolicy retryPolicy; + + /** + * Instantiate. + * @param conf configuration to read. + */ + public S3ARetryPolicy(Configuration conf) { + Preconditions.checkArgument(conf != null, "Null configuration"); + + // base policy from configuration + RetryPolicy fixedRetries = retryUpToMaximumCountWithFixedSleep( + conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT), + conf.getTimeDuration(RETRY_INTERVAL, + RETRY_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + + // which is wrapped by a rejection of all non-idempotent calls except + // for specific failures. + RetryPolicy retryIdempotentCalls = new FailNonIOEs( + new IdempotencyRetryFilter(fixedRetries)); + + // and a separate policy for throttle requests, which are considered + // repeatable, even for non-idempotent calls, as the service + // rejected the call entirely + RetryPolicy throttlePolicy = exponentialBackoffRetry( + conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT), + conf.getTimeDuration(RETRY_THROTTLE_INTERVAL, + RETRY_THROTTLE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + + // no retry on network and tangible API issues + RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL; + + // client connectivity: fixed retries without care for idempotency + RetryPolicy connectivityFailure = fixedRetries; + + // the policy map maps the exact classname; subclasses do not + // inherit policies. + Map<Class<? extends Exception>, RetryPolicy> policyMap = new HashMap<>(); + + // failfast exceptions which we consider unrecoverable + policyMap.put(UnknownHostException.class, fail); + policyMap.put(NoRouteToHostException.class, fail); + policyMap.put(InterruptedException.class, fail); + // note this does not pick up subclasses (like socket timeout) + policyMap.put(InterruptedIOException.class, fail); + policyMap.put(AWSRedirectException.class, fail); + // interesting question: should this be retried ever? + policyMap.put(AccessDeniedException.class, fail); + policyMap.put(FileNotFoundException.class, fail); + policyMap.put(InvalidRequestException.class, fail); + + // should really be handled by resubmitting to new location; + // that's beyond the scope of this retry policy + policyMap.put(AWSRedirectException.class, fail); + + // throttled requests are can be retried, always + policyMap.put(AWSServiceThrottledException.class, throttlePolicy); + + // connectivity problems are retried without worrying about idempotency + policyMap.put(ConnectTimeoutException.class, connectivityFailure); + + // this can be a sign of an HTTP connection breaking early. + // which can be reacted to by another attempt if the request was idempotent. + // But: could also be a sign of trying to read past the EOF on a GET, + // which isn't going to be recovered from + policyMap.put(EOFException.class, retryIdempotentCalls); + + // policy on a 400/bad request still ambiguous. Given it + // comes and goes on test runs: try again + policyMap.put(AWSBadRequestException.class, connectivityFailure); + + // Status 500 error code is also treated as a connectivity problem + policyMap.put(AWSStatus500Exception.class, connectivityFailure); + + // server didn't respond. + policyMap.put(AWSNoResponseException.class, retryIdempotentCalls); + + // other operations + policyMap.put(AWSClientIOException.class, retryIdempotentCalls); + policyMap.put(AWSServiceIOException.class, retryIdempotentCalls); + policyMap.put(AWSS3IOException.class, retryIdempotentCalls); + policyMap.put(SocketTimeoutException.class, retryIdempotentCalls); + + // Dynamo DB exceptions + // asking for more than you should get. It's a retry but should be logged + // trigger sleep + policyMap.put(ProvisionedThroughputExceededException.class, throttlePolicy); + + retryPolicy = retryByException(retryIdempotentCalls, policyMap); + } + + @Override + public RetryAction shouldRetry(Exception exception, + int retries, + int failovers, + boolean idempotent) throws Exception { + Exception ex = exception; + if (exception instanceof AmazonClientException) { + // uprate the amazon client exception for the purpose of exception + // processing. + ex = S3AUtils.translateException("", "", + (AmazonClientException) exception); + } + return retryPolicy.shouldRetry(ex, retries, failovers, idempotent); + } + + /** + * Policy which fails fast any non-idempotent call; hands off + * all idempotent calls to the next retry policy. + */ + private static final class IdempotencyRetryFilter implements RetryPolicy { + + private final RetryPolicy next; + + IdempotencyRetryFilter(RetryPolicy next) { + this.next = next; + } + + @Override + public RetryAction shouldRetry(Exception e, + int retries, + int failovers, + boolean idempotent) throws Exception { + return + idempotent ? + next.shouldRetry(e, retries, failovers, true) + : RetryAction.FAIL; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "IdempotencyRetryFilter{"); + sb.append("next=").append(next); + sb.append('}'); + return sb.toString(); + } + } + + /** + * All non-IOE exceptions are failed. + */ + private static final class FailNonIOEs implements RetryPolicy { + + private final RetryPolicy next; + + private FailNonIOEs(RetryPolicy next) { + this.next = next; + } + + @Override + public RetryAction shouldRetry(Exception e, + int retries, + int failovers, + boolean isIdempotentOrAtMostOnce) throws Exception { + return + e instanceof IOException ? + next.shouldRetry(e, retries, failovers, true) + : RetryAction.FAIL; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java index c1cf7cf..4b12667 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageStatistics; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.EnumMap; @@ -35,8 +36,10 @@ import java.util.concurrent.atomic.AtomicLong; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class S3AStorageStatistics extends StorageStatistics { - private static final Logger LOG = S3AFileSystem.LOG; +public class S3AStorageStatistics extends StorageStatistics + implements Iterable<StorageStatistics.LongStatistic> { + private static final Logger LOG = + LoggerFactory.getLogger(S3AStorageStatistics.class); public static final String NAME = "S3AStorageStatistics"; private final Map<Statistic, AtomicLong> opsCount = @@ -97,6 +100,11 @@ public class S3AStorageStatistics extends StorageStatistics { } @Override + public Iterator<LongStatistic> iterator() { + return getLongStatistics(); + } + + @Override public Long getLong(String key) { final Statistic type = Statistic.fromSymbol(key); return type == null ? null : opsCount.get(type).get(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 6e6f4b6..70926e6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -18,11 +18,17 @@ package org.apache.hadoop.fs.s3a; +import com.amazonaws.AbortedException; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.LimitExceededException; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.S3ObjectSummary; @@ -32,12 +38,18 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.ProviderUtils; import com.google.common.collect.Lists; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.FileNotFoundException; @@ -46,12 +58,15 @@ import java.io.InterruptedIOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.net.SocketTimeoutException; import java.net.URI; import java.nio.file.AccessDeniedException; +import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -63,8 +78,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; @InterfaceStability.Evolving public final class S3AUtils { - /** Reuse the S3AFileSystem log. */ - private static final Logger LOG = S3AFileSystem.LOG; + private static final Logger LOG = LoggerFactory.getLogger(S3AUtils.class); static final String CONSTRUCTOR_EXCEPTION = "constructor exception"; static final String INSTANTIATION_EXCEPTION = "instantiation exception"; @@ -95,6 +109,8 @@ public final class S3AUtils { S3AEncryptionMethods.SSE_S3.getMethod() + " is enabled but an encryption key was set in " + SERVER_SIDE_ENCRYPTION_KEY; + private static final String EOF_MESSAGE_IN_XML_PARSER + = "Failed to sanitize XML document destined for handler class"; private S3AUtils() { @@ -106,6 +122,10 @@ public final class S3AUtils { * {@link AmazonClientException} passed in, and any status codes included * in the operation. That is: HTTP error codes are examined and can be * used to build a more specific response. + * + * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html">S3 Error responses</a> + * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html">Amazon S3 Error Best Practices</a> + * @see <a href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html">Dynamo DB Commmon errors</a> * @param operation operation * @param path path operated on (must not be null) * @param exception amazon exception raised @@ -131,19 +151,28 @@ public final class S3AUtils { @SuppressWarnings("ThrowableInstanceNeverThrown") public static IOException translateException(String operation, String path, - AmazonClientException exception) { + SdkBaseException exception) { String message = String.format("%s%s: %s", operation, path != null ? (" on " + path) : "", exception); if (!(exception instanceof AmazonServiceException)) { - if (containsInterruptedException(exception)) { - return (IOException)new InterruptedIOException(message) - .initCause(exception); + Exception innerCause = containsInterruptedException(exception); + if (innerCause != null) { + // interrupted IO, or a socket exception underneath that class + return translateInterruptedException(exception, innerCause, message); + } + if (signifiesConnectionBroken(exception)) { + // call considered an sign of connectivity failure + return (EOFException)new EOFException(message).initCause(exception); } return new AWSClientIOException(message, exception); } else { - + if (exception instanceof AmazonDynamoDBException) { + // special handling for dynamo DB exceptions + return translateDynamoDBException(message, + (AmazonDynamoDBException)exception); + } IOException ioe; AmazonServiceException ase = (AmazonServiceException) exception; // this exception is non-null if the service exception is an s3 one @@ -151,9 +180,11 @@ public final class S3AUtils { ? (AmazonS3Exception) ase : null; int status = ase.getStatusCode(); + message = message + ":" + ase.getErrorCode(); switch (status) { case 301: + case 307: if (s3Exception != null) { if (s3Exception.getAdditionalDetails() != null && s3Exception.getAdditionalDetails().containsKey(ENDPOINT_KEY)) { @@ -163,11 +194,16 @@ public final class S3AUtils { + "the bucket.", s3Exception.getAdditionalDetails().get(ENDPOINT_KEY), ENDPOINT); } - ioe = new AWSS3IOException(message, s3Exception); + ioe = new AWSRedirectException(message, s3Exception); } else { - ioe = new AWSServiceIOException(message, ase); + ioe = new AWSRedirectException(message, ase); } break; + + case 400: + ioe = new AWSBadRequestException(message, ase); + break; + // permissions case 401: case 403: @@ -186,6 +222,25 @@ public final class S3AUtils { // a shorter one while it is being read. case 416: ioe = new EOFException(message); + ioe.initCause(ase); + break; + + // this has surfaced as a "no response from server" message. + // so rare we haven't replicated it. + // Treating as an idempotent proxy error. + case 443: + case 444: + ioe = new AWSNoResponseException(message, ase); + break; + + // throttling + case 503: + ioe = new AWSServiceThrottledException(message, ase); + break; + + // internal error + case 500: + ioe = new AWSStatus500Exception(message, ase); break; default: @@ -226,21 +281,99 @@ public final class S3AUtils { * Recurse down the exception loop looking for any inner details about * an interrupted exception. * @param thrown exception thrown - * @return true if down the execution chain the operation was an interrupt + * @return the actual exception if the operation was an interrupt */ - static boolean containsInterruptedException(Throwable thrown) { + static Exception containsInterruptedException(Throwable thrown) { if (thrown == null) { - return false; + return null; } if (thrown instanceof InterruptedException || - thrown instanceof InterruptedIOException) { - return true; + thrown instanceof InterruptedIOException || + thrown instanceof AbortedException) { + return (Exception)thrown; } // tail recurse return containsInterruptedException(thrown.getCause()); } /** + * Handles translation of interrupted exception. This includes + * preserving the class of the fault for better retry logic + * @param exception outer exception + * @param innerCause inner cause (which is guaranteed to be some form + * of interrupted exception + * @param message message for the new exception. + * @return an IOE which can be rethrown + */ + private static InterruptedIOException translateInterruptedException( + SdkBaseException exception, + final Exception innerCause, + String message) { + InterruptedIOException ioe; + if (innerCause instanceof SocketTimeoutException) { + ioe = new SocketTimeoutException(message); + } else { + String name = innerCause.getClass().getName(); + if (name.endsWith(".ConnectTimeoutException") + || name.endsWith("$ConnectTimeoutException")) { + // TCP connection http timeout from the shaded or unshaded filenames + // com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException + ioe = new ConnectTimeoutException(message); + } else { + // any other exception + ioe = new InterruptedIOException(message); + } + } + ioe.initCause(exception); + return ioe; + } + + /** + * Is the exception an instance of a throttling exception. That + * is an AmazonServiceException with a 503 response, any + * exception from DynamoDB for limits exceeded, or an + * {@link AWSServiceThrottledException}. + * @param ex exception to examine + * @return true if it is considered a throttling exception + */ + public static boolean isThrottleException(Exception ex) { + return ex instanceof AWSServiceThrottledException + || ex instanceof ProvisionedThroughputExceededException + || ex instanceof LimitExceededException + || (ex instanceof AmazonServiceException + && 503 == ((AmazonServiceException)ex).getStatusCode()); + } + + /** + * Cue that an AWS exception is likely to be an EOF Exception based + * on the message coming back from an XML/JSON parser. This is likely + * to be brittle, so only a hint. + * @param ex exception + * @return true if this is believed to be a sign the connection was broken. + */ + public static boolean signifiesConnectionBroken(SdkBaseException ex) { + return ex.toString().contains(EOF_MESSAGE_IN_XML_PARSER); + } + + /** + * Translate a DynamoDB exception into an IOException. + * @param message preformatted message for the exception + * @param ex exception + * @return an exception to throw. + */ + public static IOException translateDynamoDBException(String message, + AmazonDynamoDBException ex) { + if (isThrottleException(ex)) { + return new AWSServiceThrottledException(message, ex); + } + if (ex instanceof ResourceNotFoundException) { + return (FileNotFoundException) new FileNotFoundException(message) + .initCause(ex); + } + return new AWSServiceIOException(message, ex); + } + + /** * Get low level details of an amazon exception for logging; multi-line. * @param e exception * @return string details @@ -587,7 +720,7 @@ public final class S3AUtils { } /** - * Get a long option >= the minimum allowed value, supporting memory + * Get a long option >= the minimum allowed value, supporting memory * prefixes K,M,G,T,P. * @param conf configuration * @param key key to look up @@ -596,7 +729,7 @@ public final class S3AUtils { * @return the value * @throws IllegalArgumentException if the value is below the minimum */ - static long longBytesOption(Configuration conf, + public static long longBytesOption(Configuration conf, String key, long defVal, long min) { @@ -746,6 +879,133 @@ public final class S3AUtils { return dest; } + + /** + * Delete a path quietly: failures are logged at DEBUG. + * @param fs filesystem + * @param path path + * @param recursive recursive? + */ + public static void deleteQuietly(FileSystem fs, + Path path, + boolean recursive) { + try { + fs.delete(path, recursive); + } catch (IOException e) { + LOG.debug("Failed to delete {}", path, e); + } + } + + /** + * Delete a path: failures are logged at WARN. + * @param fs filesystem + * @param path path + * @param recursive recursive? + */ + public static void deleteWithWarning(FileSystem fs, + Path path, + boolean recursive) { + try { + fs.delete(path, recursive); + } catch (IOException e) { + LOG.warn("Failed to delete {}", path, e); + } + } + + + /** + * An interface for use in lambda-expressions working with + * directory tree listings. + */ + @FunctionalInterface + public interface CallOnLocatedFileStatus { + void call(LocatedFileStatus status) throws IOException; + } + + /** + * An interface for use in lambda-expressions working with + * directory tree listings. + */ + @FunctionalInterface + public interface LocatedFileStatusMap<T> { + T call(LocatedFileStatus status) throws IOException; + } + + /** + * Apply an operation to every {@link LocatedFileStatus} in a remote + * iterator. + * @param iterator iterator from a list + * @param eval closure to evaluate + * @throws IOException anything in the closure, or iteration logic. + */ + public static void applyLocatedFiles( + RemoteIterator<LocatedFileStatus> iterator, + CallOnLocatedFileStatus eval) throws IOException { + while (iterator.hasNext()) { + eval.call(iterator.next()); + } + } + + /** + * Map an operation to every {@link LocatedFileStatus} in a remote + * iterator, returning a list of the results. + * @param iterator iterator from a list + * @param eval closure to evaluate + * @throws IOException anything in the closure, or iteration logic. + */ + public static <T> List<T> mapLocatedFiles( + RemoteIterator<LocatedFileStatus> iterator, + LocatedFileStatusMap<T> eval) throws IOException { + final List<T> results = new ArrayList<>(); + applyLocatedFiles(iterator, + (s) -> results.add(eval.call(s))); + return results; + } + + /** + * Map an operation to every {@link LocatedFileStatus} in a remote + * iterator, returning a list of the all results which were not empty. + * @param iterator iterator from a list + * @param eval closure to evaluate + * @throws IOException anything in the closure, or iteration logic. + */ + public static <T> List<T> flatmapLocatedFiles( + RemoteIterator<LocatedFileStatus> iterator, + LocatedFileStatusMap<Optional<T>> eval) throws IOException { + final List<T> results = new ArrayList<>(); + applyLocatedFiles(iterator, + (s) -> eval.call(s).map(r -> results.add(r))); + return results; + } + + /** + * List located files and filter them as a classic listFiles(path, filter) + * would do. + * @param fileSystem filesystem + * @param path path to list + * @param recursive recursive listing? + * @param filter filter for the filename + * @return the filtered list of entries + * @throws IOException IO failure. + */ + public static List<LocatedFileStatus> listAndFilter(FileSystem fileSystem, + Path path, boolean recursive, PathFilter filter) throws IOException { + return flatmapLocatedFiles(fileSystem.listFiles(path, recursive), + status -> maybe(filter.accept(status.getPath()), status)); + } + + /** + * Convert a value into a non-empty Optional instance if + * the value of {@code include} is true. + * @param include flag to indicate the value is to be included. + * @param value value to return + * @param <T> type of option. + * @return if include is false, Optional.empty. Otherwise, the value. + */ + public static <T> Optional<T> maybe(boolean include, T value) { + return include ? Optional.of(value) : Optional.empty(); + } + /** * Patch the security credential provider information in * {@link #CREDENTIAL_PROVIDER_PATH} @@ -937,4 +1197,36 @@ public final class S3AUtils { return conf.get(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey); } + + /** + * Path filter which ignores any file which starts with . or _. + */ + public static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + + @Override + public String toString() { + return "HIDDEN_FILE_FILTER"; + } + }; + + /** + * A Path filter which accepts all filenames. + */ + public static final PathFilter ACCEPT_ALL = new PathFilter() { + @Override + public boolean accept(Path file) { + return true; + } + + @Override + public String toString() { + return "ACCEPT_ALL"; + } + }; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java index 6b3bd46..1a0d2c3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java @@ -66,4 +66,15 @@ public class S3ListRequest { public ListObjectsV2Request getV2() { return v2Request; } + + @Override + public String toString() { + if (isV1()) { + return String.format("List %s:/%s", + v1Request.getBucketName(), v1Request.getPrefix()); + } else { + return String.format("List %s:/%s", + v2Request.getBucketName(), v2Request.getPrefix()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java index 7c73a23..d1bff8a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java @@ -30,7 +30,7 @@ class S3ObjectAttributes { private S3AEncryptionMethods serverSideEncryptionAlgorithm; private String serverSideEncryptionKey; - public S3ObjectAttributes( + S3ObjectAttributes( String bucket, String key, S3AEncryptionMethods serverSideEncryptionAlgorithm, @@ -41,19 +41,19 @@ class S3ObjectAttributes { this.serverSideEncryptionKey = serverSideEncryptionKey; } - public String getBucket() { + String getBucket() { return bucket; } - public String getKey() { + String getKey() { return key; } - public S3AEncryptionMethods getServerSideEncryptionAlgorithm() { + S3AEncryptionMethods getServerSideEncryptionAlgorithm() { return serverSideEncryptionAlgorithm; } - public String getServerSideEncryptionKey() { + String getServerSideEncryptionKey() { return serverSideEncryptionKey; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 777c161..871d7c5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -77,6 +77,8 @@ public enum Statistic { "Number of continued object listings made"), OBJECT_METADATA_REQUESTS("object_metadata_requests", "Number of requests for object metadata"), + OBJECT_MULTIPART_UPLOAD_INITIATED("object_multipart_initiated", + "Object multipart upload initiated"), OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted", "Object multipart upload aborted"), OBJECT_PUT_REQUESTS("object_put_requests", @@ -142,16 +144,62 @@ public enum Statistic { STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration", "Total queue duration of all block uploads"), - // S3Guard stats + // S3guard committer stats + COMMITTER_COMMITS_CREATED( + "committer_commits_created", + "Number of files to commit created"), + COMMITTER_COMMITS_COMPLETED( + "committer_commits_completed", + "Number of files committed"), + COMMITTER_JOBS_SUCCEEDED( + "committer_jobs_completed", + "Number of successful jobs"), + COMMITTER_JOBS_FAILED( + "committer_jobs_failed", + "Number of failed jobs"), + COMMITTER_TASKS_SUCCEEDED( + "committer_tasks_completed", + "Number of successful tasks"), + COMMITTER_TASKS_FAILED( + "committer_tasks_failed", + "Number of failed tasks"), + COMMITTER_BYTES_COMMITTED( + "committer_bytes_committed", + "Amount of data committed"), + COMMITTER_BYTES_UPLOADED( + "committer_bytes_uploaded", + "Number of bytes uploaded duing commit operations"), + COMMITTER_COMMITS_FAILED( + "committer_commits_failed", + "Number of commits failed"), + COMMITTER_COMMITS_ABORTED( + "committer_commits_aborted", + "Number of commits aborted"), + COMMITTER_COMMITS_REVERTED( + "committer_commits_reverted", + "Number of commits reverted"), + COMMITTER_MAGIC_FILES_CREATED( + "committer_magic_files_created", + "Number of files created under 'magic' paths"), + + // S3guard stats S3GUARD_METADATASTORE_PUT_PATH_REQUEST( "s3guard_metadatastore_put_path_request", - "s3guard metadata store put one metadata path request"), + "S3Guard metadata store put one metadata path request"), S3GUARD_METADATASTORE_PUT_PATH_LATENCY( "s3guard_metadatastore_put_path_latency", - "s3guard metadata store put one metadata path lantency"), + "S3Guard metadata store put one metadata path latency"), S3GUARD_METADATASTORE_INITIALIZATION("s3guard_metadatastore_initialization", - "s3guard metadata store initialization times"); + "S3Guard metadata store initialization times"), + S3GUARD_METADATASTORE_RETRY("s3guard_metadatastore_retry", + "S3Guard metadata store retry events"), + S3GUARD_METADATASTORE_THROTTLED("s3guard_metadatastore_throttled", + "S3Guard metadata store throttled events"), + S3GUARD_METADATASTORE_THROTTLE_RATE( + "s3guard_metadatastore_throttle_rate", + "S3Guard metadata store throttle rate"), + STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried"); private static final Map<String, Statistic> SYMBOL_MAP = new HashMap<>(Statistic.values().length); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java new file mode 100644 index 0000000..b3dd4e2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.services.s3.transfer.model.UploadResult; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.Invoker.*; + +/** + * Helper for low-level operations against an S3 Bucket for writing data + * and creating and committing pending writes. + * <p> + * It hides direct access to the S3 API + * and is a location where the object upload process can be evolved/enhanced. + * <p> + * Features + * <ul> + * <li>Methods to create and submit requests to S3, so avoiding + * all direct interaction with the AWS APIs.</li> + * <li>Some extra preflight checks of arguments, so failing fast on + * errors.</li> + * <li>Callbacks to let the FS know of events in the output stream + * upload process.</li> + * <li>Failure handling, including converting exceptions to IOEs.</li> + * <li>Integration with instrumentation and S3Guard.</li> + * </ul> + * + * This API is for internal use only. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class WriteOperationHelper { + private static final Logger LOG = + LoggerFactory.getLogger(WriteOperationHelper.class); + private final S3AFileSystem owner; + private final Invoker invoker; + + /** + * Constructor. + * @param owner owner FS creating the helper + * + */ + protected WriteOperationHelper(S3AFileSystem owner) { + this.owner = owner; + this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()), + this::operationRetried); + } + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + void operationRetried(String text, Exception ex, int retries, + boolean idempotent) { + owner.operationRetried(text, ex, retries, idempotent); + } + + /** + * Execute a function with retry processing. + * @param action action to execute (used in error messages) + * @param path path of work (used in error messages) + * @param idempotent does the operation have semantics + * which mean that it can be retried even if was already executed? + * @param operation operation to execute + * @param <T> type of return value + * @return the result of the call + * @throws IOException any IOE raised, or translated exception + */ + public <T> T retry(String action, + String path, + boolean idempotent, + Invoker.Operation<T> operation) + throws IOException { + + return invoker.retry(action, path, idempotent, operation); + } + + /** + * Create a {@link PutObjectRequest} request against the specific key. + * @param destKey destination key + * @param inputStream source data. + * @param length size, if known. Use -1 for not known + * @return the request + */ + public PutObjectRequest createPutObjectRequest(String destKey, + InputStream inputStream, long length) { + return owner.newPutObjectRequest(destKey, + newObjectMetadata(length), + inputStream); + } + + /** + * Create a {@link PutObjectRequest} request to upload a file. + * @param dest key to PUT to. + * @param sourceFile source file + * @return the request + */ + public PutObjectRequest createPutObjectRequest(String dest, + File sourceFile) { + Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, + "File length is too big for a single PUT upload"); + return owner.newPutObjectRequest(dest, + newObjectMetadata((int) sourceFile.length()), + sourceFile); + } + + /** + * Callback on a successful write. + * @param length length of the write + */ + public void writeSuccessful(long length) { + } + + /** + * Callback on a write failure. + * @param ex Any exception raised which triggered the failure. + */ + public void writeFailed(Exception ex) { + LOG.debug("Write to {} failed", this, ex); + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * @param length size, if known. Use -1 for not known + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata(long length) { + return owner.newObjectMetadata(length); + } + + /** + * Start the multipart upload process. + * Retry policy: retrying, translated. + * @return the upload result containing the ID + * @throws IOException IO problem + */ + @Retries.RetryTranslated + public String initiateMultiPartUpload(String destKey) throws IOException { + LOG.debug("Initiating Multipart upload to {}", destKey); + final InitiateMultipartUploadRequest initiateMPURequest = + new InitiateMultipartUploadRequest(owner.getBucket(), + destKey, + newObjectMetadata(-1)); + initiateMPURequest.setCannedACL(owner.getCannedACL()); + owner.setOptionalMultipartUploadRequestParameters(initiateMPURequest); + + return retry("initiate MultiPartUpload", destKey, true, + () -> owner.initiateMultipartUpload(initiateMPURequest).getUploadId()); + } + + /** + * Finalize a multipart PUT operation. + * This completes the upload, and, if that works, calls + * {@link S3AFileSystem#finishedWrite(String, long)} to update the filesystem. + * Retry policy: retrying, translated. + * @param destKey destination of the commit + * @param uploadId multipart operation Id + * @param partETags list of partial uploads + * @param length length of the upload + * @param retrying retrying callback + * @return the result of the operation. + * @throws IOException on problems. + */ + @Retries.RetryTranslated + private CompleteMultipartUploadResult finalizeMultipartUpload( + String destKey, + String uploadId, + List<PartETag> partETags, + long length, + Retried retrying) throws IOException { + return invoker.retry("Completing multipart commit", destKey, + true, + retrying, + () -> { + // a copy of the list is required, so that the AWS SDK doesn't + // attempt to sort an unmodifiable list. + CompleteMultipartUploadResult result = + owner.getAmazonS3Client().completeMultipartUpload( + new CompleteMultipartUploadRequest(owner.getBucket(), + destKey, + uploadId, + new ArrayList<>(partETags))); + owner.finishedWrite(destKey, length); + return result; + } + ); + } + + /** + * This completes a multipart upload to the destination key via + * {@code finalizeMultipartUpload()}. + * Retry policy: retrying, translated. + * Retries increment the {@code errorCount} counter. + * @param destKey destination + * @param uploadId multipart operation Id + * @param partETags list of partial uploads + * @param length length of the upload + * @param errorCount a counter incremented by 1 on every error; for + * use in statistics + * @return the result of the operation. + * @throws IOException if problems arose which could not be retried, or + * the retry count was exceeded + */ + @Retries.RetryTranslated + public CompleteMultipartUploadResult completeMPUwithRetries( + String destKey, + String uploadId, + List<PartETag> partETags, + long length, + AtomicInteger errorCount) + throws IOException { + checkNotNull(uploadId); + checkNotNull(partETags); + LOG.debug("Completing multipart upload {} with {} parts", + uploadId, partETags.size()); + return finalizeMultipartUpload(destKey, + uploadId, + partETags, + length, + (text, e, r, i) -> errorCount.incrementAndGet()); + } + + /** + * Abort a multipart upload operation. + * @param destKey destination key of the upload + * @param uploadId multipart operation Id + * @param retrying callback invoked on every retry + * @throws IOException failure to abort + * @throws FileNotFoundException if the abort ID is unknown + */ + @Retries.RetryTranslated + public void abortMultipartUpload(String destKey, String uploadId, + Retried retrying) + throws IOException { + invoker.retry("Aborting multipart upload", destKey, true, + retrying, + () -> owner.abortMultipartUpload( + destKey, + uploadId)); + } + + /** + * Abort a multipart commit operation. + * @param upload upload to abort. + * @throws IOException on problems. + */ + @Retries.RetryTranslated + public void abortMultipartUpload(MultipartUpload upload) + throws IOException { + invoker.retry("Aborting multipart commit", upload.getKey(), true, + () -> owner.abortMultipartUpload(upload)); + } + + + /** + * Abort multipart uploads under a path: limited to the first + * few hundred. + * @param prefix prefix for uploads to abort + * @return a count of aborts + * @throws IOException trouble; FileNotFoundExceptions are swallowed. + */ + @Retries.RetryTranslated + public int abortMultipartUploadsUnderPath(String prefix) + throws IOException { + LOG.debug("Aborting multipart uploads under {}", prefix); + int count = 0; + List<MultipartUpload> multipartUploads = owner.listMultipartUploads(prefix); + LOG.debug("Number of outstanding uploads: {}", multipartUploads.size()); + for (MultipartUpload upload: multipartUploads) { + try { + abortMultipartUpload(upload); + count++; + } catch (FileNotFoundException e) { + LOG.debug("Already aborted: {}", upload.getKey(), e); + } + } + return count; + } + + /** + * Abort a multipart commit operation. + * @param destKey destination key of ongoing operation + * @param uploadId multipart operation Id + * @throws IOException on problems. + * @throws FileNotFoundException if the abort ID is unknown + */ + @Retries.RetryTranslated + public void abortMultipartCommit(String destKey, String uploadId) + throws IOException { + abortMultipartUpload(destKey, uploadId, invoker.getRetryCallback()); + } + + /** + * Create and initialize a part request of a multipart upload. + * Exactly one of: {@code uploadStream} or {@code sourceFile} + * must be specified. + * A subset of the file may be posted, by providing the starting point + * in {@code offset} and a length of block in {@code size} equal to + * or less than the remaining bytes. + * @param destKey destination key of ongoing operation + * @param uploadId ID of ongoing upload + * @param partNumber current part number of the upload + * @param size amount of data + * @param uploadStream source of data to upload + * @param sourceFile optional source file. + * @param offset offset in file to start reading. + * @return the request. + */ + public UploadPartRequest newUploadPartRequest( + String destKey, + String uploadId, + int partNumber, + int size, + InputStream uploadStream, + File sourceFile, + Long offset) { + checkNotNull(uploadId); + // exactly one source must be set; xor verifies this + checkArgument((uploadStream != null) ^ (sourceFile != null), + "Data source"); + checkArgument(size >= 0, "Invalid partition size %s", size); + checkArgument(partNumber > 0 && partNumber <= 10000, + "partNumber must be between 1 and 10000 inclusive, but is %s", + partNumber); + + LOG.debug("Creating part upload request for {} #{} size {}", + uploadId, partNumber, size); + UploadPartRequest request = new UploadPartRequest() + .withBucketName(owner.getBucket()) + .withKey(destKey) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withPartSize(size); + if (uploadStream != null) { + // there's an upload stream. Bind to it. + request.setInputStream(uploadStream); + } else { + checkArgument(sourceFile.exists(), + "Source file does not exist: %s", sourceFile); + checkArgument(offset >= 0, "Invalid offset %s", offset); + long length = sourceFile.length(); + checkArgument(offset == 0 || offset < length, + "Offset %s beyond length of file %s", offset, length); + request.setFile(sourceFile); + request.setFileOffset(offset); + } + return request; + } + + /** + * The toString method is intended to be used in logging/toString calls. + * @return a string description. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "WriteOperationHelper {bucket=").append(owner.getBucket()); + sb.append('}'); + return sb.toString(); + } + + /** + * PUT an object directly (i.e. not via the transfer manager). + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + * @param putObjectRequest the request + * @return the upload initiated + * @throws IOException on problems + */ + @Retries.RetryTranslated + public PutObjectResult putObject(PutObjectRequest putObjectRequest) + throws IOException { + return retry("put", + putObjectRequest.getKey(), true, + () -> owner.putObjectDirect(putObjectRequest)); + } + + /** + * PUT an object via the transfer manager. + * @param putObjectRequest the request + * @return the result of the operation + * @throws IOException on problems + */ + @Retries.OnceTranslated + public UploadResult uploadObject(PutObjectRequest putObjectRequest) + throws IOException { + // no retry; rely on xfer manager logic + return retry("put", + putObjectRequest.getKey(), true, + () -> owner.executePut(putObjectRequest, null)); + } + + /** + * Revert a commit by deleting the file. + * Relies on retry code in filesystem + * @throws IOException on problems + * @param destKey destination key + */ + @Retries.RetryTranslated + public void revertCommit(String destKey) throws IOException { + once("revert commit", destKey, + () -> { + Path destPath = owner.keyToQualifiedPath(destKey); + owner.deleteObjectAtPath(destPath, + destKey, true); + owner.maybeCreateFakeParentDirectory(destPath); + } + ); + } + + /** + * Upload part of a multi-partition file. + * @param request request + * @return the result of the operation. + * @throws IOException on problems + */ + @Retries.RetryTranslated + public UploadPartResult uploadPart(UploadPartRequest request) + throws IOException { + return retry("upload part", + request.getKey(), + true, + () -> owner.uploadPart(request)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java new file mode 100644 index 0000000..3277916 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.amazonaws.services.s3.model.MultipartUpload; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.net.NetUtils; + +import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; + +/** + * Abstract base class for S3A committers; allows for any commonality + * between different architectures. + * + * Although the committer APIs allow for a committer to be created without + * an output path, this is not supported in this class or its subclasses: + * a destination must be supplied. It is left to the committer factory + * to handle the creation of a committer when the destination is unknown. + * + * Requiring an output directory simplifies coding and testing. + */ +public abstract class AbstractS3ACommitter extends PathOutputCommitter { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractS3ACommitter.class); + + /** + * Thread pool for task execution. + */ + private ExecutorService threadPool; + + /** Underlying commit operations. */ + private final CommitOperations commitOperations; + + /** + * Final destination of work. + */ + private Path outputPath; + + /** + * Role: used in log/text messages. + */ + private final String role; + + /** + * This is the directory for all intermediate work: where the output format + * will write data. + * <i>This may not be on the final file system</i> + */ + private Path workPath; + + /** Configuration of the job. */ + private Configuration conf; + + /** Filesystem of {@link #outputPath}. */ + private FileSystem destFS; + + /** The job context. For a task, this can be cast to a TaskContext. */ + private final JobContext jobContext; + + /** Should a job marker be created? */ + private final boolean createJobMarker; + + /** + * Create a committer. + * This constructor binds the destination directory and configuration, but + * does not update the work path: That must be calculated by the + * implementation; + * It is omitted here to avoid subclass methods being called too early. + * @param outputPath the job's output path: MUST NOT be null. + * @param context the task's context + * @throws IOException on a failure + */ + protected AbstractS3ACommitter( + Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + Preconditions.checkArgument(outputPath != null, "null output path"); + Preconditions.checkArgument(context != null, "null job context"); + this.jobContext = context; + this.role = "Task committer " + context.getTaskAttemptID(); + setConf(context.getConfiguration()); + initOutput(outputPath); + LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", + role, jobName(context), jobIdString(context), outputPath); + S3AFileSystem fs = getDestS3AFS(); + createJobMarker = context.getConfiguration().getBoolean( + CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER); + commitOperations = new CommitOperations(fs); + } + + /** + * Init the output filesystem and path. + * TESTING ONLY; allows mock FS to cheat. + * @param out output path + * @throws IOException failure to create the FS. + */ + @VisibleForTesting + protected void initOutput(Path out) throws IOException { + FileSystem fs = getDestinationFS(out, getConf()); + setDestFS(fs); + setOutputPath(fs.makeQualified(out)); + } + + /** + * Get the job/task context this committer was instantiated with. + * @return the context. + */ + public final JobContext getJobContext() { + return jobContext; + } + + /** + * Final path of output, in the destination FS. + * @return the path + */ + @Override + public final Path getOutputPath() { + return outputPath; + } + + /** + * Set the output path. + * @param outputPath new value + */ + protected final void setOutputPath(Path outputPath) { + Preconditions.checkNotNull(outputPath, "Null output path"); + this.outputPath = outputPath; + } + + /** + * This is the critical method for {@code FileOutputFormat}; it declares + * the path for work. + * @return the working path. + */ + @Override + public Path getWorkPath() { + return workPath; + } + + /** + * Set the work path for this committer. + * @param workPath the work path to use. + */ + protected void setWorkPath(Path workPath) { + LOG.debug("Setting work path to {}", workPath); + this.workPath = workPath; + } + + public Configuration getConf() { + return conf; + } + + protected void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Get the destination FS, creating it on demand if needed. + * @return the filesystem; requires the output path to be set up + * @throws IOException if the FS cannot be instantiated. + */ + public FileSystem getDestFS() throws IOException { + if (destFS == null) { + FileSystem fs = getDestinationFS(outputPath, getConf()); + setDestFS(fs); + } + return destFS; + } + + /** + * Get the destination as an S3A Filesystem; casting it. + * @return the dest S3A FS. + * @throws IOException if the FS cannot be instantiated. + */ + public S3AFileSystem getDestS3AFS() throws IOException { + return (S3AFileSystem) getDestFS(); + } + + /** + * Set the destination FS: the FS of the final output. + * @param destFS destination FS. + */ + protected void setDestFS(FileSystem destFS) { + this.destFS = destFS; + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt ID. + * @return the path to store job attempt data. + */ + public Path getJobAttemptPath(JobContext context) { + return getJobAttemptPath(getAppAttemptId(context)); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + protected abstract Path getJobAttemptPath(int appAttemptId); + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. This may be the normal Task attempt path + * or it may be a subdirectory. + * The default implementation returns the value of + * {@link #getBaseTaskAttemptPath(TaskAttemptContext)}; + * subclasses may return different values. + * @param context the context of the task attempt. + * @return the path where a task attempt should be stored. + */ + public Path getTaskAttemptPath(TaskAttemptContext context) { + return getBaseTaskAttemptPath(context); + } + + /** + * Compute the base path where the output of a task attempt is written. + * This is the path which will be deleted when a task is cleaned up and + * aborted. + * + * @param context the context of the task attempt. + * @return the path where a task attempt should be stored. + */ + protected abstract Path getBaseTaskAttemptPath(TaskAttemptContext context); + + /** + * Get a temporary directory for data. When a task is aborted/cleaned + * up, the contents of this directory are all deleted. + * @param context task context + * @return a path for temporary data. + */ + public abstract Path getTempTaskAttemptPath(TaskAttemptContext context); + + /** + * Get the name of this committer. + * @return the committer name. + */ + public abstract String getName(); + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbstractS3ACommitter{"); + sb.append("role=").append(role); + sb.append(", name").append(getName()); + sb.append(", outputPath=").append(getOutputPath()); + sb.append(", workPath=").append(workPath); + sb.append('}'); + return sb.toString(); + } + + /** + * Get the destination filesystem from the output path and the configuration. + * @param out output path + * @param config job/task config + * @return the associated FS + * @throws PathCommitException output path isn't to an S3A FS instance. + * @throws IOException failure to instantiate the FS. + */ + protected FileSystem getDestinationFS(Path out, Configuration config) + throws IOException { + return getS3AFileSystem(out, config, + requiresDelayedCommitOutputInFileSystem()); + } + + /** + * Flag to indicate whether or not the destination filesystem needs + * to be configured to support magic paths where the output isn't immediately + * visible. If the committer returns true, then committer setup will + * fail if the FS doesn't have the capability. + * Base implementation returns false. + * @return what the requirements of the committer are of the filesystem. + */ + protected boolean requiresDelayedCommitOutputInFileSystem() { + return false; + } + /** + * Task recovery considered unsupported: Warn and fail. + * @param taskContext Context of the task whose output is being recovered + * @throws IOException always. + */ + @Override + public void recoverTask(TaskAttemptContext taskContext) throws IOException { + LOG.warn("Cannot recover task {}", taskContext.getTaskAttemptID()); + throw new PathCommitException(outputPath, + String.format("Unable to recover task %s", + taskContext.getTaskAttemptID())); + } + + /** + * if the job requires a success marker on a successful job, + * create the file {@link CommitConstants#_SUCCESS}. + * + * While the classic committers create a 0-byte file, the S3Guard committers + * PUT up a the contents of a {@link SuccessData} file. + * @param context job context + * @param pending the pending commits + * @throws IOException IO failure + */ + protected void maybeCreateSuccessMarkerFromCommits(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + List<String> filenames = new ArrayList<>(pending.size()); + for (SinglePendingCommit commit : pending) { + String key = commit.getDestinationKey(); + if (!key.startsWith("/")) { + // fix up so that FS.makeQualified() sets up the path OK + key = "/" + key; + } + filenames.add(key); + } + maybeCreateSuccessMarker(context, filenames); + } + + /** + * if the job requires a success marker on a successful job, + * create the file {@link CommitConstants#_SUCCESS}. + * + * While the classic committers create a 0-byte file, the S3Guard committers + * PUT up a the contents of a {@link SuccessData} file. + * @param context job context + * @param filenames list of filenames. + * @throws IOException IO failure + */ + protected void maybeCreateSuccessMarker(JobContext context, + List<String> filenames) + throws IOException { + if (createJobMarker) { + // create a success data structure and then save it + SuccessData successData = new SuccessData(); + successData.setCommitter(getName()); + successData.setDescription(getRole()); + successData.setHostname(NetUtils.getLocalHostname()); + Date now = new Date(); + successData.setTimestamp(now.getTime()); + successData.setDate(now.toString()); + successData.setFilenames(filenames); + commitOperations.createSuccessMarker(getOutputPath(), successData, true); + } + } + + /** + * Base job setup deletes the success marker. + * TODO: Do we need this? + * @param context context + * @throws IOException IO failure + */ +/* + + @Override + public void setupJob(JobContext context) throws IOException { + if (createJobMarker) { + try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) { + commitOperations.deleteSuccessMarker(getOutputPath()); + } + } + } +*/ + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", + context.getTaskAttemptID())) { + Path taskAttemptPath = getTaskAttemptPath(context); + FileSystem fs = getTaskAttemptFilesystem(context); + fs.mkdirs(taskAttemptPath); + } + } + + /** + * Get the task attempt path filesystem. This may not be the same as the + * final destination FS, and so may not be an S3A FS. + * @param context task attempt + * @return the filesystem + * @throws IOException failure to instantiate + */ + protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context) + throws IOException { + return getTaskAttemptPath(context).getFileSystem(getConf()); + } + + /** + * Commit a list of pending uploads. + * @param context job context + * @param pending list of pending uploads + * @throws IOException on any failure + */ + protected void commitPendingUploads(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + if (pending.isEmpty()) { + LOG.warn("{}: No pending uploads to commit", getRole()); + } + LOG.debug("{}: committing the output of {} task(s)", + getRole(), pending.size()); + Tasks.foreach(pending) + .stopOnFailure() + .executeWith(buildThreadPool(context)) + .onFailure((commit, exception) -> + getCommitOperations().abortSingleCommit(commit)) + .abortWith(commit -> getCommitOperations().abortSingleCommit(commit)) + .revertWith(commit -> getCommitOperations().revertCommit(commit)) + .run(commit -> getCommitOperations().commitOrFail(commit)); + } + + /** + * Try to read every pendingset file and build a list of them/ + * In the case of a failure to read the file, exceptions are held until all + * reads have been attempted. + * @param context job context + * @param suppressExceptions whether to suppress exceptions. + * @param fs job attempt fs + * @param pendingCommitFiles list of files found in the listing scan + * @return the list of commits + * @throws IOException on a failure when suppressExceptions is false. + */ + protected List<SinglePendingCommit> loadPendingsetFiles( + JobContext context, + boolean suppressExceptions, + FileSystem fs, + Iterable<? extends FileStatus> pendingCommitFiles) throws IOException { + + final List<SinglePendingCommit> pending = Collections.synchronizedList( + Lists.newArrayList()); + Tasks.foreach(pendingCommitFiles) + .suppressExceptions(suppressExceptions) + .executeWith(buildThreadPool(context)) + .run(pendingCommitFile -> + pending.addAll( + PendingSet.load(fs, pendingCommitFile.getPath()).getCommits()) + ); + return pending; + } + + /** + * Internal Job commit operation: where the S3 requests are made + * (potentially in parallel). + * @param context job context + * @param pending pending request + * @throws IOException any failure + */ + protected void commitJobInternal(JobContext context, + List<SinglePendingCommit> pending) + throws IOException { + + commitPendingUploads(context, pending); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) + throws IOException { + LOG.info("{}: aborting job {} in state {}", + getRole(), jobIdString(context), state); + // final cleanup operations + abortJobInternal(context, false); + } + + + /** + * The internal job abort operation; can be overridden in tests. + * This must clean up operations; it is called when a commit fails, as + * well as in an {@link #abortJob(JobContext, JobStatus.State)} call. + * The base implementation calls {@link #cleanup(JobContext, boolean)} + * @param context job context + * @param suppressExceptions should exceptions be suppressed? + * @throws IOException any IO problem raised when suppressExceptions is false. + */ + protected void abortJobInternal(JobContext context, + boolean suppressExceptions) + throws IOException { + cleanup(context, suppressExceptions); + } + + /** + * Abort all pending uploads to the destination directory during + * job cleanup operations. + * @param suppressExceptions should exceptions be suppressed + */ + protected void abortPendingUploadsInCleanup( + boolean suppressExceptions) throws IOException { + Path dest = getOutputPath(); + try (DurationInfo d = + new DurationInfo(LOG, "Aborting all pending commits under %s", + dest)) { + CommitOperations ops = getCommitOperations(); + List<MultipartUpload> pending = ops + .listPendingUploadsUnderPath(dest); + Tasks.foreach(pending) + .executeWith(buildThreadPool(getJobContext())) + .suppressExceptions(suppressExceptions) + .run(u -> ops.abortMultipartCommit(u.getKey(), u.getUploadId())); + } + } + + /** + * Subclass-specific pre commit actions. + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + protected void preCommitJob(JobContext context, + List<SinglePendingCommit> pending) throws IOException { + } + + /** + * Commit work. + * This consists of two stages: precommit and commit. + * <p> + * Precommit: identify pending uploads, then allow subclasses + * to validate the state of the destination and the pending uploads. + * Any failure here triggers an abort of all pending uploads. + * <p> + * Commit internal: do the final commit sequence. + * <p> + * The final commit action is to build the {@code __SUCCESS} file entry. + * </p> + * @param context job context + * @throws IOException any failure + */ + @Override + public void commitJob(JobContext context) throws IOException { + String id = jobIdString(context); + try (DurationInfo d = new DurationInfo(LOG, + "%s: commitJob(%s)", getRole(), id)) { + List<SinglePendingCommit> pending + = listPendingUploadsToCommit(context); + preCommitJob(context, pending); + commitJobInternal(context, pending); + jobCompleted(true); + maybeCreateSuccessMarkerFromCommits(context, pending); + cleanup(context, false); + } catch (IOException e) { + LOG.warn("Commit failure for job {}", id, e); + jobCompleted(false); + abortJobInternal(context, true); + throw e; + } + } + + /** + * Job completion outcome; this may be subclassed in tests. + * @param success did the job succeed. + */ + protected void jobCompleted(boolean success) { + getCommitOperations().jobCompleted(success); + } + + /** + * Clean up any staging directories. + * IOEs must be caught and swallowed. + */ + public abstract void cleanupStagingDirs(); + + /** + * Get the list of pending uploads for this job attempt. + * @param context job context + * @return a list of pending uploads. + * @throws IOException Any IO failure + */ + protected abstract List<SinglePendingCommit> listPendingUploadsToCommit( + JobContext context) + throws IOException; + + /** + * Cleanup the job context, including aborting anything pending. + * @param context job context + * @param suppressExceptions should exceptions be suppressed? + * @throws IOException any failure if exceptions were not suppressed. + */ + protected void cleanup(JobContext context, + boolean suppressExceptions) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, + "Cleanup job %s", jobIdString(context))) { + abortPendingUploadsInCleanup(suppressExceptions); + } finally { + cleanupStagingDirs(); + } + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext context) throws IOException { + String r = getRole(); + String id = jobIdString(context); + LOG.warn("{}: using deprecated cleanupJob call for {}", r, id); + try (DurationInfo d = new DurationInfo(LOG, "%s: cleanup Job %s", r, id)) { + cleanup(context, true); + } + } + + /** + * Execute an operation; maybe suppress any raised IOException. + * @param suppress should raised IOEs be suppressed? + * @param action action (for logging when the IOE is supressed. + * @param operation operation + * @throws IOException if operation raised an IOE and suppress == false + */ + protected void maybeIgnore( + boolean suppress, + String action, + Invoker.VoidOperation operation) throws IOException { + if (suppress) { + ignoreIOExceptions(LOG, action, "", operation); + } else { + operation.execute(); + } + } + + /** + * Execute an operation; maybe suppress any raised IOException. + * @param suppress should raised IOEs be suppressed? + * @param action action (for logging when the IOE is suppressed. + * @param ex exception + * @throws IOException if suppress == false + */ + protected void maybeIgnore( + boolean suppress, + String action, + IOException ex) throws IOException { + if (suppress) { + LOG.info(action, ex); + } else { + throw ex; + } + } + + /** + * Get the commit actions instance. + * Subclasses may provide a mock version of this. + * @return the commit actions instance to use for operations. + */ + protected CommitOperations getCommitOperations() { + return commitOperations; + } + + /** + * Used in logging and reporting to help disentangle messages. + * @return the committer's role. + */ + protected String getRole() { + return role; + } + + /** + * Returns an {@link ExecutorService} for parallel tasks. The number of + * threads in the thread-pool is set by s3.multipart.committer.num-threads. + * If num-threads is 0, this will return null; + * + * @param context the JobContext for this commit + * @return an {@link ExecutorService} or null for the number of threads + */ + protected final synchronized ExecutorService buildThreadPool( + JobContext context) { + + if (threadPool == null) { + int numThreads = context.getConfiguration().getInt( + FS_S3A_COMMITTER_THREADS, + DEFAULT_COMMITTER_THREADS); + LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads); + if (numThreads > 0) { + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("s3-committer-pool-%d") + .build()); + } else { + return null; + } + } + return threadPool; + } + + /** + * Delete the task attempt path without raising any errors. + * @param context task context + */ + protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) { + Path attemptPath = getBaseTaskAttemptPath(context); + ignoreIOExceptions(LOG, "Delete task attempt path", attemptPath.toString(), + () -> deleteQuietly( + getTaskAttemptFilesystem(context), attemptPath, true)); + } + + /** + * Abort all pending uploads in the list. + * @param context job context + * @param pending pending uploads + * @param suppressExceptions should exceptions be suppressed + * @throws IOException any exception raised + */ + protected void abortPendingUploads(JobContext context, + List<SinglePendingCommit> pending, + boolean suppressExceptions) + throws IOException { + if (pending == null || pending.isEmpty()) { + LOG.info("{}: no pending commits to abort", getRole()); + } else { + try (DurationInfo d = new DurationInfo(LOG, + "Aborting %s uploads", pending.size())) { + Tasks.foreach(pending) + .executeWith(buildThreadPool(context)) + .suppressExceptions(suppressExceptions) + .run(commit -> getCommitOperations().abortSingleCommit(commit)); + } + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org