[
https://issues.apache.org/jira/browse/HADOOP-17271?focusedWorklogId=514572&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-514572
]
ASF GitHub Bot logged work on HADOOP-17271:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Nov/20 11:57
Start Date: 20/Nov/20 11:57
Worklog Time Spent: 10m
Work Description: bgaborg commented on a change in pull request #2324:
URL: https://github.com/apache/hadoop/pull/2324#discussion_r526759387
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
##########
@@ -102,21 +104,32 @@ public boolean outputImmediatelyVisible() {
* @param uploadId Upload ID
* @param parts list of parts
* @param bytesWritten bytes written
+ * @param iostatistics nullable IO statistics
* @return false, indicating that the commit must fail.
* @throws IOException any IO problem.
* @throws IllegalArgumentException bad argument
*/
@Override
public boolean aboutToComplete(String uploadId,
List<PartETag> parts,
- long bytesWritten)
+ long bytesWritten,
+ final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: "+ uploadId);
Preconditions.checkArgument(parts != null,
"No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");
+
+ // put a 0-byte file with the name of the original under-magic path
Review comment:
why it is needed to add this with the IOStat feature? is it related
somehow?
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
##########
@@ -72,36 +99,123 @@ public AmazonS3 createS3Client(URI name,
if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
}
- return configureAmazonS3Client(
- newAmazonS3Client(credentials, awsConf), conf);
+ // optional metrics
+ RequestMetricCollector metrics = statisticsFromAwsSdk != null
+ ? new AwsStatisticsCollector(statisticsFromAwsSdk)
+ : null;
+
+ return newAmazonS3Client(
+ credentials,
+ awsConf,
+ metrics,
+ conf.getTrimmed(ENDPOINT, ""),
+ conf.getBoolean(PATH_STYLE_ACCESS, false));
}
/**
- * Wrapper around constructor for {@link AmazonS3} client.
+ * Create an {@link AmazonS3} client.
* Override this to provide an extended version of the client
* @param credentials credentials to use
* @param awsConf AWS configuration
- * @return new AmazonS3 client
+ * @param metrics metrics collector or null
+ * @param endpoint endpoint string; may be ""
+ * @param pathStyleAccess enable path style access?
+ * @return new AmazonS3 client
*/
protected AmazonS3 newAmazonS3Client(
- AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
- return new AmazonS3Client(credentials, awsConf);
+ final AWSCredentialsProvider credentials,
+ final ClientConfiguration awsConf,
+ final RequestMetricCollector metrics,
+ final String endpoint,
+ final boolean pathStyleAccess) {
+ if (metrics != null) {
+ LOG.debug("Building S3 client using the SDK builder API");
+ return buildAmazonS3Client(credentials, awsConf, metrics, endpoint,
+ pathStyleAccess);
+ } else {
+ LOG.debug("Building S3 client using the SDK builder API");
+ return classicAmazonS3Client(credentials, awsConf, endpoint,
+ pathStyleAccess);
+ }
}
/**
- * Configure S3 client from the Hadoop configuration.
- *
+ * Use the (newer) Builder SDK to create a an AWS S3 client.
+ * <p>
+ * This has a more complex endpoint configuration in a
+ * way which does not yet work in this code in a way
+ * which doesn't trigger regressions. So it is only used
+ * when SDK metrics are supplied.
+ * @param credentials credentials to use
+ * @param awsConf AWS configuration
+ * @param metrics metrics collector or null
+ * @param endpoint endpoint string; may be ""
+ * @param pathStyleAccess enable path style access?
+ * @return new AmazonS3 client
+ */
+ private AmazonS3 buildAmazonS3Client(
Review comment:
This change seems unrelated. Please explain why this change is needed
for the stat API
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
##########
@@ -1017,68 +1198,63 @@ void blockUploadCompleted(long duration, int blockSize)
{
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
- void blockUploadFailed(long duration, int blockSize) {
- blockUploadsFailed.incrementAndGet();
+ @Override
+ public void blockUploadFailed(long duration, int blockSize) {
+ incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}
/** Intermediate report of bytes uploaded. */
- void bytesTransferred(long byteCount) {
+ @Override
+ public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
- statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
- /**
- * Note exception in a multipart complete.
- * @param count count of exceptions
- */
- void exceptionInMultipartComplete(int count) {
+ @Override
+ public void exceptionInMultipartComplete(int count) {
if (count > 0) {
- exceptionsInMultipartFinalize.addAndGet(count);
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
+ count);
}
}
- /**
- * Note an exception in a multipart abort.
- */
- void exceptionInMultipartAbort() {
- exceptionsInMultipartFinalize.incrementAndGet();
+ @Override
+ public void exceptionInMultipartAbort() {
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol());
}
- /**
- * Get the number of bytes pending upload.
- * @return the number of bytes in the pending upload state.
- */
+ @Override
Review comment:
Maybe we could keep the javadocs here
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
##########
@@ -1017,68 +1198,63 @@ void blockUploadCompleted(long duration, int blockSize)
{
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
- void blockUploadFailed(long duration, int blockSize) {
- blockUploadsFailed.incrementAndGet();
+ @Override
+ public void blockUploadFailed(long duration, int blockSize) {
+ incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}
/** Intermediate report of bytes uploaded. */
- void bytesTransferred(long byteCount) {
+ @Override
+ public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
- statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
- /**
- * Note exception in a multipart complete.
- * @param count count of exceptions
- */
- void exceptionInMultipartComplete(int count) {
+ @Override
+ public void exceptionInMultipartComplete(int count) {
if (count > 0) {
- exceptionsInMultipartFinalize.addAndGet(count);
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
+ count);
}
}
- /**
- * Note an exception in a multipart abort.
- */
- void exceptionInMultipartAbort() {
- exceptionsInMultipartFinalize.incrementAndGet();
+ @Override
+ public void exceptionInMultipartAbort() {
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol());
}
- /**
- * Get the number of bytes pending upload.
- * @return the number of bytes in the pending upload state.
- */
+ @Override
public long getBytesPendingUpload() {
return bytesPendingUpload.get();
}
- /**
- * Data has been uploaded to be committed in a subsequent operation;
- * to be called at the end of the write.
- * @param size size in bytes
- */
+ @Override
Review comment:
Maybe we could keep the javadocs here
##########
File path: hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
##########
@@ -75,3 +75,11 @@
log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG
log4j.logger.org.apache.hadoop.mapreduce.lib.output=DEBUG
log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO
+
+# Set to debug if you need to debug S3A endpoint problems.
+#log4j.logger.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory=DEBUG
+
+# This causes all remote iterator stats
+# to be logged when the RemoteIterators.foreach() method is
+# invoked
+log4j.logger.org.apache.hadoop.util.functional.RemoteIterators=DEBUG
Review comment:
maybe info level logging is better here, or this is needed to show all
stats?
##########
File path: hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
##########
@@ -75,3 +75,11 @@
log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG
log4j.logger.org.apache.hadoop.mapreduce.lib.output=DEBUG
log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO
+
+# Set to debug if you need to debug S3A endpoint problems.
+#log4j.logger.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory=DEBUG
+
+# This causes all remote iterator stats
+# to be logged when the RemoteIterators.foreach() method is
+# invoked
+log4j.logger.org.apache.hadoop.util.functional.RemoteIterators=DEBUG
Review comment:
( i know this is for testing )
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
##########
@@ -1017,68 +1198,63 @@ void blockUploadCompleted(long duration, int blockSize)
{
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
- void blockUploadFailed(long duration, int blockSize) {
- blockUploadsFailed.incrementAndGet();
+ @Override
+ public void blockUploadFailed(long duration, int blockSize) {
+ incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}
/** Intermediate report of bytes uploaded. */
- void bytesTransferred(long byteCount) {
+ @Override
+ public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
- statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
- /**
- * Note exception in a multipart complete.
- * @param count count of exceptions
- */
- void exceptionInMultipartComplete(int count) {
+ @Override
+ public void exceptionInMultipartComplete(int count) {
if (count > 0) {
- exceptionsInMultipartFinalize.addAndGet(count);
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
+ count);
}
}
- /**
- * Note an exception in a multipart abort.
- */
- void exceptionInMultipartAbort() {
- exceptionsInMultipartFinalize.incrementAndGet();
+ @Override
+ public void exceptionInMultipartAbort() {
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol());
}
- /**
- * Get the number of bytes pending upload.
- * @return the number of bytes in the pending upload state.
- */
+ @Override
public long getBytesPendingUpload() {
return bytesPendingUpload.get();
}
- /**
- * Data has been uploaded to be committed in a subsequent operation;
- * to be called at the end of the write.
- * @param size size in bytes
- */
+ @Override
public void commitUploaded(long size) {
incrementCounter(COMMITTER_BYTES_UPLOADED, size);
}
- /**
- * Output stream has closed.
- * Trigger merge in of all statistics not updated during operation.
- */
@Override
Review comment:
Maybe we could keep the javadocs here
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
##########
@@ -1017,68 +1198,63 @@ void blockUploadCompleted(long duration, int blockSize)
{
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
- void blockUploadFailed(long duration, int blockSize) {
- blockUploadsFailed.incrementAndGet();
+ @Override
+ public void blockUploadFailed(long duration, int blockSize) {
+ incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}
/** Intermediate report of bytes uploaded. */
- void bytesTransferred(long byteCount) {
+ @Override
+ public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
- statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
- /**
- * Note exception in a multipart complete.
- * @param count count of exceptions
- */
- void exceptionInMultipartComplete(int count) {
+ @Override
Review comment:
Maybe we could keep the javadocs here
##########
File path:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java
##########
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.io.IOUtils;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
+/**
+ * A set of remote iterators supporting transformation and filtering,
+ * with IOStatisticsSource passthrough, and of conversions of
+ * the iterators to lists/arrays and of performing actions
+ * on the values.
+ * <p></p>
+ * This aims to make it straightforward to use lambda-expressions to
+ * transform the results of an iterator, without losing the statistics
+ * in the process, and to chain the operations together.
+ * <p></p>
+ * The closeable operation will be passed through RemoteIterators which
+ * wrap other RemoteIterators. This is to support any iterator which
+ * can be closed to release held connections, file handles etc.
+ * Unless client code is written to assume that RemoteIterator instances
+ * may be closed, this is not likely to be broadly used. It is added
+ * to make it possible to adopt this feature in a managed way.
+ * <p></p>
+ * One notable feature is that the
+ * {@link #foreach(RemoteIterator, ConsumerRaisingIOE)} method will
+ * LOG at debug any IOStatistics provided by the iterator, if such
+ * statistics are provided. There's no attempt at retrieval and logging
+ * if the LOG is not set to debug, so it is a zero cost feature unless
+ * the logger {@code org.apache.hadoop.fs.functional.RemoteIterators}
+ * is at DEBUG.
+ * <p></p>
+ * Based on the S3A Listing code, and some some work on moving other code
+ * to using iterative listings so as to pick up the statistics.
+ */
[email protected]
[email protected]
+public final class RemoteIterators {
Review comment:
We already have a RemoteIterator class in hadoop. Can we extend that
one, or is there a reason to create a whole new one here?
##########
File path:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
##########
@@ -30,13 +31,15 @@
* A wrapper for an IOException which
* {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to
* always extract the exception.
- *
+ * <p></p>
* The constructor signature guarantees the cause will be an IOException,
* and as it checks for a null-argument, non-null.
+ * @deprecated: use the {@code UncheckedIOException}.
*/
+@Deprecated
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class WrappedIOException extends RuntimeException {
+public class WrappedIOException extends UncheckedIOException {
Review comment:
nice touch, we were talking about this exception is a more narrow kind
of runtime exception.
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
##########
@@ -1017,68 +1198,63 @@ void blockUploadCompleted(long duration, int blockSize)
{
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
- void blockUploadFailed(long duration, int blockSize) {
- blockUploadsFailed.incrementAndGet();
+ @Override
+ public void blockUploadFailed(long duration, int blockSize) {
+ incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}
/** Intermediate report of bytes uploaded. */
- void bytesTransferred(long byteCount) {
+ @Override
+ public void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
- statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
- /**
- * Note exception in a multipart complete.
- * @param count count of exceptions
- */
- void exceptionInMultipartComplete(int count) {
+ @Override
+ public void exceptionInMultipartComplete(int count) {
if (count > 0) {
- exceptionsInMultipartFinalize.addAndGet(count);
+ incCounter(
+ STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
+ count);
}
}
- /**
- * Note an exception in a multipart abort.
- */
- void exceptionInMultipartAbort() {
- exceptionsInMultipartFinalize.incrementAndGet();
+ @Override
Review comment:
Maybe we could keep the javadocs here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 514572)
Time Spent: 6h (was: 5h 50m)
> S3A statistics to support IOStatistics
> --------------------------------------
>
> Key: HADOOP-17271
> URL: https://issues.apache.org/jira/browse/HADOOP-17271
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.3.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
> Time Spent: 6h
> Remaining Estimate: 0h
>
> S3A to rework statistics with
> * API + Implementation split of the interfaces used by subcomponents when
> reporting stats
> * S3A Instrumentation to implement all the interfaces
> * streams, etc to all implement IOStatisticsSources and serve to callers
> * Add some tracking of durations of remote requests
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]