ahmarsuhail commented on code in PR #6180:
URL: https://github.com/apache/hadoop/pull/6180#discussion_r1407618042
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -144,14 +152,29 @@ private Constants() {
SimpleAWSCredentialsProvider.NAME;
- // the maximum number of tasks cached if all threads are already uploading
+ /**
+ * The maximum number of tasks queued (other than prefetcher tasks) if all
threads are
+ * busy: {@value}.
+ */
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
+ /**
+ * Default value for {@link #MAX_TOTAL_TASKS}: {@value}.
+ */
public static final int DEFAULT_MAX_TOTAL_TASKS = 32;
- // number of simultaneous connections to s3
+ /**
+ * Number of simultaneous connections to S3: {@value}.
+ */
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
- public static final int DEFAULT_MAXIMUM_CONNECTIONS = 96;
+
+ /**
+ * Default value for {@link #MAXIMUM_CONNECTIONS}: {@value}.
+ * Note this value gets increased over time as more connections are used
Review Comment:
I think we should make this a bit clearer. Currently to me it reads like
"connection pool size will increase dynamically", but I guess what we mean is
this size is increasing as we've added prefetching and vectoredIO and do more
parallel ops
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java:
##########
@@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
- * Configures request timeout.
+ * Configures request timeout in the client configuration.
+ * This is independent of the timeouts set in the sync and async HTTP
clients;
+ * the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
- long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
- DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ // Get the connection settings
+ final ConnectionSettings conn = createApiConnectionSettings(conf);
+ final Duration callTimeout = conn.getApiCallTimeout();
- if (requestTimeoutMillis > Integer.MAX_VALUE) {
- LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
- requestTimeoutMillis, Integer.MAX_VALUE);
- requestTimeoutMillis = Integer.MAX_VALUE;
+ if (callTimeout.toMillis() > 0) {
Review Comment:
My understanding is that apiCallTimeout is the total time allowed for the
API call, including retries. and apiCallAttemptTimeout is the timeout for an
individual HTTP request. I think apiCallTimeout should be:
number of SDK retries * callTimeout. will confirm with SDK team
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java:
##########
@@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
- * Configures request timeout.
+ * Configures request timeout in the client configuration.
+ * This is independent of the timeouts set in the sync and async HTTP
clients;
+ * the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
- long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
- DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ // Get the connection settings
+ final ConnectionSettings conn = createApiConnectionSettings(conf);
+ final Duration callTimeout = conn.getApiCallTimeout();
- if (requestTimeoutMillis > Integer.MAX_VALUE) {
- LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
- requestTimeoutMillis, Integer.MAX_VALUE);
- requestTimeoutMillis = Integer.MAX_VALUE;
+ if (callTimeout.toMillis() > 0) {
+ clientConfig.apiCallAttemptTimeout(callTimeout);
+ clientConfig.apiCallTimeout(callTimeout);
}
+ }
- if(requestTimeoutMillis > 0) {
-
clientConfig.apiCallAttemptTimeout(Duration.ofMillis(requestTimeoutMillis));
+ /**
+ * Reset the minimum operation duration to the default.
+ * Logs at INFO.
+ * <p>
+ * This MUST be called in test teardown in any test suite which
+ * called {@link #setMinimumOperationDuration(Duration)}.
+ */
+ public static void resetMinimumOperationDuration() {
+ setMinimumOperationDuration(MINIMUM_NETWORK_OPERATION_DURATION);
+ }
+
+ /**
+ * Set the minimum operation duration.
+ * This is for testing and will log at info; does require a non-negative
duration.
+ * <p>
+ * Test suites must call {@link #resetMinimumOperationDuration()} in their
teardown
+ * to avoid breaking subsequent tests in the same process.
+ * @param duration non-negative duration
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ @VisibleForTesting
+ public static void setMinimumOperationDuration(Duration duration) {
+ LOG.info("Setting minimum operation duration to {}ms",
duration.toMillis());
+ checkArgument(duration.compareTo(Duration.ZERO) >= 0,
+ "Duration must be positive: %sms", duration.toMillis());
+ minimumOperationDuration = duration;
+ }
+
+ /**
+ * Get the current minimum operation duration.
+ * @return current duration.
+ */
+ public static Duration getMinimumOperationDuration() {
+ return minimumOperationDuration;
+ }
+
+ /**
+ * All the connection settings, wrapped as a class for use by
+ * both the sync and async clients, and connection client builder.
+ */
+ static class ConnectionSettings {
Review Comment:
as this constructor is getting quite large, maybe we can use a builder here
instead.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java:
##########
@@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
- * Configures request timeout.
+ * Configures request timeout in the client configuration.
+ * This is independent of the timeouts set in the sync and async HTTP
clients;
+ * the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
- long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
- DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ // Get the connection settings
+ final ConnectionSettings conn = createApiConnectionSettings(conf);
+ final Duration callTimeout = conn.getApiCallTimeout();
- if (requestTimeoutMillis > Integer.MAX_VALUE) {
- LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
- requestTimeoutMillis, Integer.MAX_VALUE);
- requestTimeoutMillis = Integer.MAX_VALUE;
+ if (callTimeout.toMillis() > 0) {
+ clientConfig.apiCallAttemptTimeout(callTimeout);
+ clientConfig.apiCallTimeout(callTimeout);
}
+ }
- if(requestTimeoutMillis > 0) {
-
clientConfig.apiCallAttemptTimeout(Duration.ofMillis(requestTimeoutMillis));
+ /**
+ * Reset the minimum operation duration to the default.
+ * Logs at INFO.
+ * <p>
+ * This MUST be called in test teardown in any test suite which
+ * called {@link #setMinimumOperationDuration(Duration)}.
+ */
+ public static void resetMinimumOperationDuration() {
+ setMinimumOperationDuration(MINIMUM_NETWORK_OPERATION_DURATION);
+ }
+
+ /**
+ * Set the minimum operation duration.
+ * This is for testing and will log at info; does require a non-negative
duration.
+ * <p>
+ * Test suites must call {@link #resetMinimumOperationDuration()} in their
teardown
+ * to avoid breaking subsequent tests in the same process.
+ * @param duration non-negative duration
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ @VisibleForTesting
+ public static void setMinimumOperationDuration(Duration duration) {
+ LOG.info("Setting minimum operation duration to {}ms",
duration.toMillis());
+ checkArgument(duration.compareTo(Duration.ZERO) >= 0,
+ "Duration must be positive: %sms", duration.toMillis());
+ minimumOperationDuration = duration;
+ }
+
+ /**
+ * Get the current minimum operation duration.
+ * @return current duration.
+ */
+ public static Duration getMinimumOperationDuration() {
+ return minimumOperationDuration;
+ }
+
+ /**
+ * All the connection settings, wrapped as a class for use by
+ * both the sync and async clients, and connection client builder.
+ */
+ static class ConnectionSettings {
+ private final int maxConnections;
+ private final boolean keepAlive;
+ private final Duration acquisitionTimeout;
+ private final Duration apiCallTimeout;
+ private final Duration connectionTTL;
+ private final Duration establishTimeout;
+ private final Duration maxIdleTime;
+ private final Duration socketTimeout;
+
+ ConnectionSettings(
+ final int maxConnections,
+ final boolean keepAlive,
+ final Duration apiCallTimeout,
+ final Duration acquisitionTimeout,
+ final Duration connectionTTL,
+ final Duration establishTimeout,
+ final Duration maxIdleTime,
+ final Duration socketTimeout) {
+ this.maxConnections = maxConnections;
+ this.keepAlive = keepAlive;
+ this.acquisitionTimeout = acquisitionTimeout;
+ this.apiCallTimeout = apiCallTimeout;
+ this.connectionTTL = connectionTTL;
+ this.establishTimeout = establishTimeout;
+ this.maxIdleTime = maxIdleTime;
+ this.socketTimeout = socketTimeout;
+ }
+
+ int getMaxConnections() {
+ return maxConnections;
+ }
+
+ boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ Duration getAcquisitionTimeout() {
+ return acquisitionTimeout;
+ }
+
+ Duration getApiCallTimeout() {
+ return apiCallTimeout;
+ }
+
+ Duration getConnectionTTL() {
+ return connectionTTL;
+ }
+
+ Duration getEstablishTimeout() {
+ return establishTimeout;
+ }
+
+ Duration getMaxIdleTime() {
+ return maxIdleTime;
+ }
+
+ Duration getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @Override
+ public String toString() {
+ return "ConnectionSettings{" +
+ "maxConnections=" + maxConnections +
+ ", keepAlive=" + keepAlive +
+ ", acquisitionTimeout=" + acquisitionTimeout +
+ ", apiCallTimeout=" + apiCallTimeout +
+ ", connectionTTL=" + connectionTTL +
+ ", establishTimeout=" + establishTimeout +
+ ", maxIdleTime=" + maxIdleTime +
+ ", socketTimeout=" + socketTimeout +
+ '}';
}
}
+
+
+ /**
+ * Build a connection settings object with only the settings used
+ * for the ClientConfig only.
+ * All other fields are null and MUST NOT be used.
+ * @param conf configuration to evaluate
+ * @return connection settings.
+ */
+ static ConnectionSettings createApiConnectionSettings(Configuration conf) {
Review Comment:
There's only one connection setting config on the client, and the rest are
for the HTTP builders. Can we simplify this by keeping apiCallTimeout out of
ConnectionSettings?
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -205,31 +205,86 @@ The default pool sizes are intended to strike a balance
between performance
and memory/thread use.
You can have a larger pool of (reused) HTTP connections and threads
-for parallel IO (especially uploads) by setting the properties
+for parallel IO (especially uploads, prefetching and vector reads) by setting
the appropriate
+properties. Note: S3A Connectors have their own thread pools for job commit,
but
+everything uses the same HTTP connection pool.
+| Property | Default | Meaning
|
+|--------------------------------|---------|------------------------------------------|
+| `fs.s3a.threads.max` | `96` | Threads in the thread pool
|
+| `fs.s3a.threads.keepalivetime` | `60s` | Threads in the thread pool
|
Review Comment:
typo in the meaning for keepalivetime
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -205,31 +205,86 @@ The default pool sizes are intended to strike a balance
between performance
and memory/thread use.
You can have a larger pool of (reused) HTTP connections and threads
-for parallel IO (especially uploads) by setting the properties
+for parallel IO (especially uploads, prefetching and vector reads) by setting
the appropriate
+properties. Note: S3A Connectors have their own thread pools for job commit,
but
+everything uses the same HTTP connection pool.
+| Property | Default | Meaning
|
+|--------------------------------|---------|------------------------------------------|
+| `fs.s3a.threads.max` | `96` | Threads in the thread pool
|
+| `fs.s3a.threads.keepalivetime` | `60s` | Threads in the thread pool
|
+| `fs.s3a.executor.capacity` | `16` | Maximum threads for any single
operation |
+| `fs.s3a.max.total.tasks` | `16` | Extra tasks which can be queued
|
Review Comment:
can mention this does not include prefetching
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java:
##########
@@ -171,9 +173,18 @@ public static IOException translateException(@Nullable
String operation,
operation,
StringUtils.isNotEmpty(path)? (" on " + path) : "",
exception);
- if (!(exception instanceof AwsServiceException)) {
- // exceptions raised client-side: connectivity, auth, network problems...
+ // timeout issues
+ if (exception instanceof ApiCallTimeoutException
Review Comment:
move to ErrorTranslation class? not sure how we decide what goes there and
what stays here. I think we said all of this will be moved ErrorTranslation
anyway ..
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java:
##########
@@ -183,6 +185,12 @@ public static IOException translateException(@Nullable
String operation,
// call considered an sign of connectivity failure
return (EOFException)new EOFException(message).initCause(exception);
}
+ if (exception instanceof ApiCallTimeoutException
+ || exception instanceof ApiCallAttemptTimeoutException) {
+ // An API call to an AWS service timed out.
Review Comment:
APICallTimeout is timeout of the API call, including retries,
APICallAttemptTimeout is the timeout of the individual HTTP request.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java:
##########
@@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
- * Configures request timeout.
+ * Configures request timeout in the client configuration.
+ * This is independent of the timeouts set in the sync and async HTTP
clients;
+ * the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
- long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
- DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ // Get the connection settings
+ final ConnectionSettings conn = createApiConnectionSettings(conf);
+ final Duration callTimeout = conn.getApiCallTimeout();
- if (requestTimeoutMillis > Integer.MAX_VALUE) {
- LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
- requestTimeoutMillis, Integer.MAX_VALUE);
- requestTimeoutMillis = Integer.MAX_VALUE;
+ if (callTimeout.toMillis() > 0) {
+ clientConfig.apiCallAttemptTimeout(callTimeout);
+ clientConfig.apiCallTimeout(callTimeout);
}
+ }
- if(requestTimeoutMillis > 0) {
-
clientConfig.apiCallAttemptTimeout(Duration.ofMillis(requestTimeoutMillis));
+ /**
+ * Reset the minimum operation duration to the default.
+ * Logs at INFO.
+ * <p>
+ * This MUST be called in test teardown in any test suite which
+ * called {@link #setMinimumOperationDuration(Duration)}.
+ */
+ public static void resetMinimumOperationDuration() {
Review Comment:
update java docs to mention it's only for test use
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -205,31 +205,86 @@ The default pool sizes are intended to strike a balance
between performance
and memory/thread use.
You can have a larger pool of (reused) HTTP connections and threads
-for parallel IO (especially uploads) by setting the properties
+for parallel IO (especially uploads, prefetching and vector reads) by setting
the appropriate
+properties. Note: S3A Connectors have their own thread pools for job commit,
but
+everything uses the same HTTP connection pool.
+| Property | Default | Meaning
|
+|--------------------------------|---------|------------------------------------------|
+| `fs.s3a.threads.max` | `96` | Threads in the thread pool
|
+| `fs.s3a.threads.keepalivetime` | `60s` | Threads in the thread pool
|
+| `fs.s3a.executor.capacity` | `16` | Maximum threads for any single
operation |
+| `fs.s3a.max.total.tasks` | `16` | Extra tasks which can be queued
|
-| property | meaning | default |
-|----------|---------|---------|
-| `fs.s3a.threads.max`| Threads in the AWS transfer manager| 10 |
-| `fs.s3a.connection.maximum`| Maximum number of HTTP connections | 10|
-We recommend using larger values for processes which perform
-a lot of IO: `DistCp`, Spark Workers and similar.
+Network timeout options can be tuned to make the client fail faster *or* retry
more.
+The choice is yours. Generally recovery is better, but sometimes fail-fast is
more useful.
-```xml
-<property>
- <name>fs.s3a.threads.max</name>
- <value>20</value>
-</property>
-<property>
- <name>fs.s3a.connection.maximum</name>
- <value>20</value>
-</property>
-```
-Be aware, however, that processes which perform many parallel queries
-may consume large amounts of resources if each query is working with
-a different set of s3 buckets, or are acting on behalf of different users.
+| Property | Default | V2 | Meaning
|
+|-----------------------------------------|---------|:----|-------------------------------------------------------|
+| `fs.s3a.threads.max` | `96` | | Threads in the
thread pool |
+| `fs.s3a.threads.keepalivetime` | `60s` | | Threads in the
thread pool |
Review Comment:
also typo here
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md:
##########
@@ -205,31 +205,86 @@ The default pool sizes are intended to strike a balance
between performance
and memory/thread use.
You can have a larger pool of (reused) HTTP connections and threads
-for parallel IO (especially uploads) by setting the properties
+for parallel IO (especially uploads, prefetching and vector reads) by setting
the appropriate
+properties. Note: S3A Connectors have their own thread pools for job commit,
but
+everything uses the same HTTP connection pool.
+| Property | Default | Meaning
|
+|--------------------------------|---------|------------------------------------------|
+| `fs.s3a.threads.max` | `96` | Threads in the thread pool
|
+| `fs.s3a.threads.keepalivetime` | `60s` | Threads in the thread pool
|
+| `fs.s3a.executor.capacity` | `16` | Maximum threads for any single
operation |
+| `fs.s3a.max.total.tasks` | `16` | Extra tasks which can be queued
|
-| property | meaning | default |
-|----------|---------|---------|
-| `fs.s3a.threads.max`| Threads in the AWS transfer manager| 10 |
-| `fs.s3a.connection.maximum`| Maximum number of HTTP connections | 10|
-We recommend using larger values for processes which perform
-a lot of IO: `DistCp`, Spark Workers and similar.
+Network timeout options can be tuned to make the client fail faster *or* retry
more.
+The choice is yours. Generally recovery is better, but sometimes fail-fast is
more useful.
-```xml
-<property>
- <name>fs.s3a.threads.max</name>
- <value>20</value>
-</property>
-<property>
- <name>fs.s3a.connection.maximum</name>
- <value>20</value>
-</property>
-```
-Be aware, however, that processes which perform many parallel queries
-may consume large amounts of resources if each query is working with
-a different set of s3 buckets, or are acting on behalf of different users.
+| Property | Default | V2 | Meaning
|
+|-----------------------------------------|---------|:----|-------------------------------------------------------|
+| `fs.s3a.threads.max` | `96` | | Threads in the
thread pool |
+| `fs.s3a.threads.keepalivetime` | `60s` | | Threads in the
thread pool |
+| `fs.s3a.executor.capacity` | `16` | | Maximum threads
for any single operation |
+| `fs.s3a.max.total.tasks` | `16` | | Maximum threads
for any single operation |
Review Comment:
typo in description for total tasks
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -1574,4 +1574,6 @@ public static boolean
isCreatePerformanceEnabled(FileSystem fs)
}
Review Comment:
can revert this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]