[
https://issues.apache.org/jira/browse/HADOOP-18457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620364#comment-17620364
]
ASF GitHub Bot commented on HADOOP-18457:
-----------------------------------------
steveloughran commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r999378107
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -32,6 +32,7 @@
public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
+ public static final boolean
DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
Review Comment:
1. add with the other changes in this file to make cherrypick easier.
2. Why is this the default?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -25,6 +25,7 @@
import java.net.UnknownHostException;
import java.util.List;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
Review Comment:
check your ide settings of import placement, this should be in the next block
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ readThrottler = setAnalyzer("read", abfsConfiguration);
+ writeThrottler = setAnalyzer("write", abfsConfiguration);
+ }
+
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ public static synchronized AbfsClientThrottlingIntercept
initializeSingleton(AbfsConfiguration abfsConfiguration) {
Review Comment:
can you add javadocs for these methods; the hadoop-azure module is
underdocumented, but that is no reason to not add some yourself
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
import java.net.HttpURLConnection;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
Review Comment:
import ordering wrong
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -278,8 +284,9 @@ private boolean executeHttpOperation(final int retryCount,
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
- AbfsClientThrottlingIntercept.sendingRequest(operationType,
abfsCounters);
-
+ if (intercept != null) {
Review Comment:
by having the abfsclient/analyzer factgory returning a
`NoopAbfsClientThrottlingIntercept` interceptor when interception is disabled,
there will be no need for this check
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
Review Comment:
factor this out into the interface `AbfsClientThrottlingIntercept` and
implementation `AnalyzingAbfsClientThrottlingIntercept`. Then add a no-op
intercept, the `NoopAbfsClientThrottlingIntercept` which can be returned from
lookups when intercept is off -so ensuring there's no need to check for null
values in the rest operation
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+class AbfsOperationMetrics {
Review Comment:
have getters return the actual long value; add methods to add() each counter
type and invoke them. this keeps the fact that atomic longs are used an
implementation detail
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null;
+ AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+ String accountName = abfsConfiguration.getAccountName();
+ AbfsClientThrottlingIntercept intercept
+ = AbfsClientThrottlingInterceptFactory.getInstance(accountName,
abfsConfiguration);
Review Comment:
nit, put the = on the line above
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
Review Comment:
use try-with-resources
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
Review Comment:
use setAuthoThrottlingEnabled, as it reads better. maybe rename the field to
match
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
+ throw new AssertionError(ex);
+ }
+ fs1.initialize(defaultUri1, getRawConfiguration());
+ AbfsConfiguration configuration2 =
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+ AbfsClientThrottlingIntercept accountIntercept1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+ configuration2);
+ FSDataOutputStream stream1 = fs1.create(testPath);
+ try {
+ stream1.write(b);
+ } finally{
+ stream1.close();
+ }
+
+ //Verfiy the write analyzer for first account is idle but the read
analyzer is not idle.
+ if (accountIntercept != null){
+
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());
+
Assert.assertFalse(accountIntercept.getReadThrottler().getIsOperationOnAccountIdle().get());
Review Comment:
adding static imports of these asserts would be good
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
Review Comment:
make final and set in both constructors
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
import java.net.HttpURLConnection;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
Review Comment:
cut to ease cherrypicking
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ readThrottler = setAnalyzer("read", abfsConfiguration);
+ writeThrottler = setAnalyzer("write", abfsConfiguration);
+ }
+
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ public static synchronized AbfsClientThrottlingIntercept
initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) {
Review Comment:
how often is this method invoked?
it may be better to keep the synchronized block until after the
singleton==null check, so if it is non null the current value is returned
without any lock acquisition.
if you do that, a double check is needed...other bits of the code does this
if you look
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -117,6 +117,10 @@
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+ private boolean isAccountThrottlingEnabled;
Review Comment:
cut `is` from the boolean
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingInterceptFactory.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsClientThrottlingInterceptFactory {
+
+ private AbfsClientThrottlingInterceptFactory() {
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsClientThrottlingInterceptFactory.class);
+
+ // Map which stores instance of ThrottlingIntercept class per account
+ private static Map<String, AbfsClientThrottlingIntercept> instanceMapping
+ = new ConcurrentHashMap<>();
Review Comment:
-1 to this. it will leak memory, especially on long lived hive/spark/impala
processes. see #4966 for a recent problem there.
options
1. use `org.apache.hadoop.util.WeakReferenceMap`
2. have a per FS instance value and pass it down. i would prefer this.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null;
+ AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+ String accountName = abfsConfiguration.getAccountName();
+ AbfsClientThrottlingIntercept intercept
Review Comment:
add the throttling intercept to the abfsClient so it is shared, rather than
looked up on every api call.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -44,28 +47,20 @@ class AbfsClientThrottlingAnalyzer {
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
-
Review Comment:
revert the line cut; makes backporting harder
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.getBytesFailed().addAndGet(count);
+ metrics.getOperationsFailed().incrementAndGet();
} else {
- metrics.bytesSuccessful.addAndGet(count);
- metrics.operationsSuccessful.incrementAndGet();
+ metrics.getBytesSuccessful().addAndGet(count);
+ metrics.getOperationsSuccessful().incrementAndGet();
}
+ blobMetrics.set(metrics);
}
/**
* Suspends the current storage operation, as necessary, to reduce
throughput.
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
+ if (isOperationOnAccountIdle.get()) {
Review Comment:
somehow i want iostatistics to track time spent sleeping here. if a
DurationTrackerFactory is passed in to the analyzer along with a key to update,
then this can be added now/later
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java:
##########
@@ -18,9 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
+import java.io.IOException;
Review Comment:
add in a block above the org.apache ones
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.getBytesFailed().addAndGet(count);
Review Comment:
exposes too much of the metrics internals. better to have
`addBytesFailed(long)` method and similar in AbfsOperationMetrics and invoke
these here
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
Review Comment:
use try-with-resources
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
+ throw new AssertionError(ex);
+ }
+ fs1.initialize(defaultUri1, getRawConfiguration());
+ AbfsConfiguration configuration2 =
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+ AbfsClientThrottlingIntercept accountIntercept1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+ configuration2);
+ FSDataOutputStream stream1 = fs1.create(testPath);
+ try {
+ stream1.write(b);
+ } finally{
+ stream1.close();
+ }
+
+ //Verfiy the write analyzer for first account is idle but the read
analyzer is not idle.
+ if (accountIntercept != null){
+
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());
Review Comment:
add a mesage on all these asserts. think "if this assert failed on a jenkins
run, what would i need to know?"
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
Review Comment:
no need to wrap as the test throws Exception.
> ABFS: Support for account level throttling
> ------------------------------------------
>
> Key: HADOOP-18457
> URL: https://issues.apache.org/jira/browse/HADOOP-18457
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.3.4
> Reporter: Anmol Asrani
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
>
> To add support for throttling at account level
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]