steveloughran commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r999378107
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -32,6 +32,7 @@
public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
+ public static final boolean
DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
Review Comment:
1. add with the other changes in this file to make cherrypick easier.
2. Why is this the default?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -25,6 +25,7 @@
import java.net.UnknownHostException;
import java.util.List;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
Review Comment:
check your ide settings of import placement, this should be in the next block
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ readThrottler = setAnalyzer("read", abfsConfiguration);
+ writeThrottler = setAnalyzer("write", abfsConfiguration);
+ }
+
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ public static synchronized AbfsClientThrottlingIntercept
initializeSingleton(AbfsConfiguration abfsConfiguration) {
Review Comment:
can you add javadocs for these methods; the hadoop-azure module is
underdocumented, but that is no reason to not add some yourself
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
import java.net.HttpURLConnection;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
Review Comment:
import ordering wrong
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -278,8 +284,9 @@ private boolean executeHttpOperation(final int retryCount,
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
- AbfsClientThrottlingIntercept.sendingRequest(operationType,
abfsCounters);
-
+ if (intercept != null) {
Review Comment:
by having the abfsclient/analyzer factgory returning a
`NoopAbfsClientThrottlingIntercept` interceptor when interception is disabled,
there will be no need for this check
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
Review Comment:
factor this out into the interface `AbfsClientThrottlingIntercept` and
implementation `AnalyzingAbfsClientThrottlingIntercept`. Then add a no-op
intercept, the `NoopAbfsClientThrottlingIntercept` which can be returned from
lookups when intercept is off -so ensuring there's no need to check for null
values in the rest operation
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+class AbfsOperationMetrics {
Review Comment:
have getters return the actual long value; add methods to add() each counter
type and invoke them. this keeps the fact that atomic longs are used an
implementation detail
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null;
+ AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+ String accountName = abfsConfiguration.getAccountName();
+ AbfsClientThrottlingIntercept intercept
+ = AbfsClientThrottlingInterceptFactory.getInstance(accountName,
abfsConfiguration);
Review Comment:
nit, put the = on the line above
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
Review Comment:
use try-with-resources
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
Review Comment:
use setAuthoThrottlingEnabled, as it reads better. maybe rename the field to
match
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
+ throw new AssertionError(ex);
+ }
+ fs1.initialize(defaultUri1, getRawConfiguration());
+ AbfsConfiguration configuration2 =
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+ AbfsClientThrottlingIntercept accountIntercept1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+ configuration2);
+ FSDataOutputStream stream1 = fs1.create(testPath);
+ try {
+ stream1.write(b);
+ } finally{
+ stream1.close();
+ }
+
+ //Verfiy the write analyzer for first account is idle but the read
analyzer is not idle.
+ if (accountIntercept != null){
+
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());
+
Assert.assertFalse(accountIntercept.getReadThrottler().getIsOperationOnAccountIdle().get());
Review Comment:
adding static imports of these asserts would be good
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
Review Comment:
make final and set in both constructors
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
import java.net.HttpURLConnection;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
Review Comment:
cut to ease cherrypicking
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
+ private String accountName = "";
+
+ private synchronized void setIsAutoThrottlingEnabled(boolean
autoThrottlingEnabled) {
+ isAutoThrottlingEnabled = autoThrottlingEnabled;
+ }
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration
abfsConfiguration) {
+ setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName,
abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for
the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean
enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ readThrottler = setAnalyzer("read", abfsConfiguration);
+ writeThrottler = setAnalyzer("write", abfsConfiguration);
+ }
+
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name,
AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ public static synchronized AbfsClientThrottlingIntercept
initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) {
Review Comment:
how often is this method invoked?
it may be better to keep the synchronized block until after the
singleton==null check, so if it is non null the current value is returned
without any lock acquisition.
if you do that, a double check is needed...other bits of the code does this
if you look
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -117,6 +117,10 @@
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+ private boolean isAccountThrottlingEnabled;
Review Comment:
cut `is` from the boolean
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingInterceptFactory.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsClientThrottlingInterceptFactory {
+
+ private AbfsClientThrottlingInterceptFactory() {
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsClientThrottlingInterceptFactory.class);
+
+ // Map which stores instance of ThrottlingIntercept class per account
+ private static Map<String, AbfsClientThrottlingIntercept> instanceMapping
+ = new ConcurrentHashMap<>();
Review Comment:
-1 to this. it will leak memory, especially on long lived hive/spark/impala
processes. see #4966 for a recent problem there.
options
1. use `org.apache.hadoop.util.WeakReferenceMap`
2. have a per FS instance value and pass it down. i would prefer this.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null;
+ AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+ String accountName = abfsConfiguration.getAccountName();
+ AbfsClientThrottlingIntercept intercept
Review Comment:
add the throttling intercept to the abfsClient so it is shared, rather than
looked up on every api call.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -44,28 +47,20 @@ class AbfsClientThrottlingAnalyzer {
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
-
Review Comment:
revert the line cut; makes backporting harder
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.getBytesFailed().addAndGet(count);
+ metrics.getOperationsFailed().incrementAndGet();
} else {
- metrics.bytesSuccessful.addAndGet(count);
- metrics.operationsSuccessful.incrementAndGet();
+ metrics.getBytesSuccessful().addAndGet(count);
+ metrics.getOperationsSuccessful().incrementAndGet();
}
+ blobMetrics.set(metrics);
}
/**
* Suspends the current storage operation, as necessary, to reduce
throughput.
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
+ if (isOperationOnAccountIdle.get()) {
Review Comment:
somehow i want iostatistics to track time spent sleeping here. if a
DurationTrackerFactory is passed in to the analyzer along with a key to update,
then this can be added now/later
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java:
##########
@@ -18,9 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
+import java.io.IOException;
Review Comment:
add in a block above the org.apache ones
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.getBytesFailed().addAndGet(count);
Review Comment:
exposes too much of the metrics internals. better to have
`addBytesFailed(long)` method and similar in AbfsOperationMetrics and invoke
these here
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
Review Comment:
use try-with-resources
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
+ throw new AssertionError(ex);
+ }
+ fs1.initialize(defaultUri1, getRawConfiguration());
+ AbfsConfiguration configuration2 =
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+ AbfsClientThrottlingIntercept accountIntercept1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+ configuration2);
+ FSDataOutputStream stream1 = fs1.create(testPath);
+ try {
+ stream1.write(b);
+ } finally{
+ stream1.close();
+ }
+
+ //Verfiy the write analyzer for first account is idle but the read
analyzer is not idle.
+ if (accountIntercept != null){
+
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());
Review Comment:
add a mesage on all these asserts. think "if this assert failed on a jenkins
run, what would i need to know?"
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig);
}
+ @Test
+ public void testCreateMultipleAccountThrottling() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+ if (accountName == null) {
+ // check if accountName is set using different config key
+ accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ }
+ assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+ accountName != null && !accountName.isEmpty());
+
+ Configuration rawConfig1 = new Configuration();
+ rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+ AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+ AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+ when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+ when(successOp.getResult()).thenReturn(http500Op);
+
+ AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+ when(configuration.getAnalysisPeriod()).thenReturn(10000);
+ when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+ AbfsClientThrottlingIntercept instance1 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+ AbfsClientThrottlingIntercept instance2 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ //if singleton is enabled, for different accounts both the instances
should return same value
+ assertEquals(instance1, instance2);
+
+ when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+ AbfsClientThrottlingIntercept instance3 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ AbfsClientThrottlingIntercept instance4 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+ AbfsClientThrottlingIntercept instance5 =
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+ //if singleton is not enabled, for different accounts instances should
return different value
+ assertNotEquals(instance3, instance4);
+
+ //if singleton is not enabled, for same accounts instances should return
same value
+ assertEquals(instance3, instance5);
+ }
+
+ @Test
+ public void testOperationOnAccountIdle() throws Exception {
+ //Get the filesystem.
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsConfiguration configuration1 =
fs.getAbfsStore().getClient().getAbfsConfiguration();
+ Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+ AbfsClientThrottlingIntercept accountIntercept =
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+ configuration1);
+ int bufferSize = MIN_BUFFER_SIZE;
+ final byte[] b = new byte[2 * bufferSize];
+ new Random().nextBytes(b);
+
+ Path testPath = path(TEST_PATH);
+
+ //Do an operation on the filesystem.
+ FSDataOutputStream stream = fs.create(testPath);
+
+ try {
+ stream.write(b);
+ } finally{
+ stream.close();
+ }
+
+ //Don't perform any operation on the account.
+ int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) *
1.5);
+ Thread.sleep(sleepTime);
+
+ FSDataInputStream streamRead = fs.open(testPath);
+ try {
+ streamRead.read(b);
+ } finally{
+ streamRead.close();
+ }
+
+ //Perform operations on another account.
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ Configuration config = new Configuration(getRawConfiguration());
+ String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+ final String abfsUrl1 = this.getFileSystemName() + "12" + "@" +
accountName1;
+ URI defaultUri1 = null;
+ try {
+ defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+ } catch (Exception ex) {
Review Comment:
no need to wrap as the test throws Exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]