[ 
https://issues.apache.org/jira/browse/HADOOP-18457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620364#comment-17620364
 ] 

ASF GitHub Bot commented on HADOOP-18457:
-----------------------------------------

steveloughran commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r999378107


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -32,6 +32,7 @@
 public final class FileSystemConfigurations {
 
   public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
+  public static final boolean 
DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;

Review Comment:
   1. add with the other changes in this file to make cherrypick easier.
   
   2. Why is this the default?
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -25,6 +25,7 @@
 import java.net.UnknownHostException;
 import java.util.List;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;

Review Comment:
   check your ide settings of import placement, this should be in the next block



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
   private static boolean isAutoThrottlingEnabled = false;
+  private String accountName = "";
+
+  private synchronized void setIsAutoThrottlingEnabled(boolean 
autoThrottlingEnabled) {
+    isAutoThrottlingEnabled = autoThrottlingEnabled;
+  }
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    readThrottler = setAnalyzer("read", abfsConfiguration);
+    writeThrottler = setAnalyzer("write", abfsConfiguration);
+  }
+
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  public static synchronized AbfsClientThrottlingIntercept 
initializeSingleton(AbfsConfiguration abfsConfiguration) {

Review Comment:
   can you add javadocs for these methods; the hadoop-azure module is 
underdocumented, but that is no reason to not add some yourself



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
 
 import java.net.HttpURLConnection;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;

Review Comment:
   import ordering wrong



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -278,8 +284,9 @@ private boolean executeHttpOperation(final int retryCount,
       // dump the headers
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
           httpOperation.getConnection().getRequestProperties());
-      AbfsClientThrottlingIntercept.sendingRequest(operationType, 
abfsCounters);
-
+      if (intercept != null) {

Review Comment:
   by having the abfsclient/analyzer factgory returning a 
`NoopAbfsClientThrottlingIntercept` interceptor when interception is disabled, 
there will be no need for this check



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
   private static boolean isAutoThrottlingEnabled = false;
+  private String accountName = "";
+
+  private synchronized void setIsAutoThrottlingEnabled(boolean 
autoThrottlingEnabled) {
+    isAutoThrottlingEnabled = autoThrottlingEnabled;
+  }
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {

Review Comment:
   factor this out into the interface `AbfsClientThrottlingIntercept` and 
implementation `AnalyzingAbfsClientThrottlingIntercept`. Then add a no-op 
intercept, the `NoopAbfsClientThrottlingIntercept` which can be returned from 
lookups when intercept is off -so ensuring there's no need to check for null 
values in the rest operation



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Stores Abfs operation metrics during each analysis period.
+ */
+class AbfsOperationMetrics {

Review Comment:
   have getters return the actual long value; add methods to add() each counter 
type and invoke them. this keeps the fact that atomic longs are used an 
implementation detail



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
   private boolean executeHttpOperation(final int retryCount,
     TracingContext tracingContext) throws AzureBlobFileSystemException {
     AbfsHttpOperation httpOperation = null;
+    AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+    String accountName = abfsConfiguration.getAccountName();
+    AbfsClientThrottlingIntercept intercept
+        = AbfsClientThrottlingInterceptFactory.getInstance(accountName, 
abfsConfiguration);

Review Comment:
   nit, put the = on the line above
   



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(10000);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+    AbfsClientThrottlingIntercept instance1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    AbfsClientThrottlingIntercept instance2 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances 
should return same value
+    assertEquals(instance1, instance2);
+
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+    AbfsClientThrottlingIntercept instance3 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsClientThrottlingIntercept instance4 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsClientThrottlingIntercept instance5 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should 
return different value
+    assertNotEquals(instance3, instance4);
+
+    //if singleton is not enabled, for same accounts instances should return 
same value
+    assertEquals(instance3, instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsConfiguration configuration1 = 
fs.getAbfsStore().getClient().getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept = 
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+            configuration1);
+    int bufferSize = MIN_BUFFER_SIZE;
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    FSDataOutputStream stream = fs.create(testPath);

Review Comment:
   use try-with-resources



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
   private static boolean isAutoThrottlingEnabled = false;
+  private String accountName = "";
+
+  private synchronized void setIsAutoThrottlingEnabled(boolean 
autoThrottlingEnabled) {

Review Comment:
   use setAuthoThrottlingEnabled, as it reads better. maybe rename the field to 
match



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(10000);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+    AbfsClientThrottlingIntercept instance1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    AbfsClientThrottlingIntercept instance2 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances 
should return same value
+    assertEquals(instance1, instance2);
+
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+    AbfsClientThrottlingIntercept instance3 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsClientThrottlingIntercept instance4 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsClientThrottlingIntercept instance5 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should 
return different value
+    assertNotEquals(instance3, instance4);
+
+    //if singleton is not enabled, for same accounts instances should return 
same value
+    assertEquals(instance3, instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsConfiguration configuration1 = 
fs.getAbfsStore().getClient().getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept = 
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+            configuration1);
+    int bufferSize = MIN_BUFFER_SIZE;
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    FSDataOutputStream stream = fs.create(testPath);
+
+    try {
+      stream.write(b);
+    } finally{
+      stream.close();
+    }
+
+    //Don't perform any operation on the account.
+    int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) * 
1.5);
+    Thread.sleep(sleepTime);
+
+    FSDataInputStream streamRead = fs.open(testPath);
+    try {
+      streamRead.read(b);
+    } finally{
+      streamRead.close();
+    }
+
+    //Perform operations on another account.
+    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + 
accountName1;
+    URI defaultUri1 = null;
+    try {
+      defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+    } catch (Exception ex) {
+      throw new AssertionError(ex);
+    }
+    fs1.initialize(defaultUri1, getRawConfiguration());
+    AbfsConfiguration configuration2 = 
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+    AbfsClientThrottlingIntercept accountIntercept1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+            configuration2);
+    FSDataOutputStream stream1 = fs1.create(testPath);
+    try {
+      stream1.write(b);
+    } finally{
+      stream1.close();
+    }
+
+    //Verfiy the write analyzer for first account is idle but the read 
analyzer is not idle.
+    if (accountIntercept != null){
+      
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());
+      
Assert.assertFalse(accountIntercept.getReadThrottler().getIsOperationOnAccountIdle().get());

Review Comment:
   adding static imports of these asserts would be good



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
   private static boolean isAutoThrottlingEnabled = false;
+  private String accountName = "";

Review Comment:
   make final and set in both constructors



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -20,12 +20,14 @@
 
 import java.net.HttpURLConnection;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 
+

Review Comment:
   cut to ease cherrypicking



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -46,25 +48,49 @@ public final class AbfsClientThrottlingIntercept {
   private AbfsClientThrottlingAnalyzer readThrottler = null;
   private AbfsClientThrottlingAnalyzer writeThrottler = null;
   private static boolean isAutoThrottlingEnabled = false;
+  private String accountName = "";
+
+  private synchronized void setIsAutoThrottlingEnabled(boolean 
autoThrottlingEnabled) {
+    isAutoThrottlingEnabled = autoThrottlingEnabled;
+  }
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    setIsAutoThrottlingEnabled(abfsConfiguration.isAutoThrottlingEnabled());
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    readThrottler = setAnalyzer("read", abfsConfiguration);
+    writeThrottler = setAnalyzer("write", abfsConfiguration);
+  }
+
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  public static synchronized AbfsClientThrottlingIntercept 
initializeSingleton(AbfsConfiguration abfsConfiguration) {
     if (singleton == null) {

Review Comment:
   how often is this method invoked?
   it may be better to keep the synchronized block until after the 
singleton==null check, so if it is non null the current value is returned 
without any lock acquisition. 
   if you do that, a double check is needed...other bits of the code does this 
if you look



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -117,6 +117,10 @@
       DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
   private boolean optimizeFooterRead;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+      DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+  private boolean isAccountThrottlingEnabled;

Review Comment:
   cut `is` from the boolean



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingInterceptFactory.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsClientThrottlingInterceptFactory {
+
+  private AbfsClientThrottlingInterceptFactory() {
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsClientThrottlingInterceptFactory.class);
+
+  // Map which stores instance of ThrottlingIntercept class per account
+  private static Map<String, AbfsClientThrottlingIntercept> instanceMapping
+      = new ConcurrentHashMap<>();

Review Comment:
   -1 to this. it will leak memory, especially on long lived hive/spark/impala 
processes. see #4966 for a recent problem there.
   
   options
   
   1. use `org.apache.hadoop.util.WeakReferenceMap`
   
   2. have a per FS instance value and pass it down. i would prefer this.
   
   



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -242,6 +243,11 @@ private void completeExecute(TracingContext tracingContext)
   private boolean executeHttpOperation(final int retryCount,
     TracingContext tracingContext) throws AzureBlobFileSystemException {
     AbfsHttpOperation httpOperation = null;
+    AbfsConfiguration abfsConfiguration = client.getAbfsConfiguration();
+    String accountName = abfsConfiguration.getAccountName();
+    AbfsClientThrottlingIntercept intercept

Review Comment:
   add the throttling intercept to the abfsClient so it is shared, rather than 
looked up on every api call.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -44,28 +47,20 @@ class AbfsClientThrottlingAnalyzer {
   private static final double SLEEP_DECREASE_FACTOR = .975;
   private static final double SLEEP_INCREASE_FACTOR = 1.05;
   private int analysisPeriodMs;
-

Review Comment:
   revert the line cut; makes backporting harder



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.getBytesFailed().addAndGet(count);
+      metrics.getOperationsFailed().incrementAndGet();
     } else {
-      metrics.bytesSuccessful.addAndGet(count);
-      metrics.operationsSuccessful.incrementAndGet();
+      metrics.getBytesSuccessful().addAndGet(count);
+      metrics.getOperationsSuccessful().incrementAndGet();
     }
+    blobMetrics.set(metrics);
   }
 
   /**
    * Suspends the current storage operation, as necessary, to reduce 
throughput.
    * @return true if Thread sleeps(Throttling occurs) else false.
    */
   public boolean suspendIfNecessary() {
+    if (isOperationOnAccountIdle.get()) {

Review Comment:
   somehow i want iostatistics to track time spent sleeping here. if a 
DurationTrackerFactory is passed in to the analyzer along with a key to update, 
then this can be added now/later



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java:
##########
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.junit.Test;
 
+import java.io.IOException;

Review Comment:
   add in a block above the org.apache ones



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.getBytesFailed().addAndGet(count);

Review Comment:
   exposes too much of the metrics internals. better to have 
`addBytesFailed(long)` method and similar in AbfsOperationMetrics and invoke 
these here



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(10000);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+    AbfsClientThrottlingIntercept instance1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    AbfsClientThrottlingIntercept instance2 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances 
should return same value
+    assertEquals(instance1, instance2);
+
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+    AbfsClientThrottlingIntercept instance3 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsClientThrottlingIntercept instance4 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsClientThrottlingIntercept instance5 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should 
return different value
+    assertNotEquals(instance3, instance4);
+
+    //if singleton is not enabled, for same accounts instances should return 
same value
+    assertEquals(instance3, instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsConfiguration configuration1 = 
fs.getAbfsStore().getClient().getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept = 
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+            configuration1);
+    int bufferSize = MIN_BUFFER_SIZE;
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    FSDataOutputStream stream = fs.create(testPath);
+
+    try {
+      stream.write(b);
+    } finally{
+      stream.close();
+    }
+
+    //Don't perform any operation on the account.
+    int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) * 
1.5);
+    Thread.sleep(sleepTime);
+
+    FSDataInputStream streamRead = fs.open(testPath);

Review Comment:
   use try-with-resources



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(10000);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+    AbfsClientThrottlingIntercept instance1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    AbfsClientThrottlingIntercept instance2 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances 
should return same value
+    assertEquals(instance1, instance2);
+
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+    AbfsClientThrottlingIntercept instance3 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsClientThrottlingIntercept instance4 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsClientThrottlingIntercept instance5 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should 
return different value
+    assertNotEquals(instance3, instance4);
+
+    //if singleton is not enabled, for same accounts instances should return 
same value
+    assertEquals(instance3, instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsConfiguration configuration1 = 
fs.getAbfsStore().getClient().getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept = 
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+            configuration1);
+    int bufferSize = MIN_BUFFER_SIZE;
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    FSDataOutputStream stream = fs.create(testPath);
+
+    try {
+      stream.write(b);
+    } finally{
+      stream.close();
+    }
+
+    //Don't perform any operation on the account.
+    int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) * 
1.5);
+    Thread.sleep(sleepTime);
+
+    FSDataInputStream streamRead = fs.open(testPath);
+    try {
+      streamRead.read(b);
+    } finally{
+      streamRead.close();
+    }
+
+    //Perform operations on another account.
+    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + 
accountName1;
+    URI defaultUri1 = null;
+    try {
+      defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+    } catch (Exception ex) {
+      throw new AssertionError(ex);
+    }
+    fs1.initialize(defaultUri1, getRawConfiguration());
+    AbfsConfiguration configuration2 = 
fs1.getAbfsStore().getClient().getAbfsConfiguration();
+    AbfsClientThrottlingIntercept accountIntercept1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1,
+            configuration2);
+    FSDataOutputStream stream1 = fs1.create(testPath);
+    try {
+      stream1.write(b);
+    } finally{
+      stream1.close();
+    }
+
+    //Verfiy the write analyzer for first account is idle but the read 
analyzer is not idle.
+    if (accountIntercept != null){
+      
Assert.assertTrue(accountIntercept.getWriteThrottler().getIsOperationOnAccountIdle().get());

Review Comment:
   add a mesage on all these asserts. think "if this assert failed on a jenkins 
run, what would i need to know?"



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -67,6 +87,130 @@ public void testDefaultMaxIORetryCount() throws Exception {
     testMaxIOConfig(abfsConfig);
   }
 
+  @Test
+  public void testCreateMultipleAccountThrottling() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
+    if (accountName == null) {
+      // check if accountName is set using different config key
+      accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    }
+    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
+        accountName != null && !accountName.isEmpty());
+
+    Configuration rawConfig1 = new Configuration();
+    rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
+
+    AbfsRestOperation successOp = mock(AbfsRestOperation.class);
+    AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
+    when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
+    when(successOp.getResult()).thenReturn(http500Op);
+
+    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
+    when(configuration.getAnalysisPeriod()).thenReturn(10000);
+    when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(false);
+
+    AbfsClientThrottlingIntercept instance1 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+
+    AbfsClientThrottlingIntercept instance2 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    //if singleton is enabled, for different accounts both the instances 
should return same value
+    assertEquals(instance1, instance2);
+
+    when(configuration.isAccountThrottlingEnabled()).thenReturn(true);
+    AbfsClientThrottlingIntercept instance3 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    AbfsClientThrottlingIntercept instance4 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName1, configuration);
+    AbfsClientThrottlingIntercept instance5 = 
AbfsClientThrottlingInterceptFactory.getInstance(accountName, configuration);
+    //if singleton is not enabled, for different accounts instances should 
return different value
+    assertNotEquals(instance3, instance4);
+
+    //if singleton is not enabled, for same accounts instances should return 
same value
+    assertEquals(instance3, instance5);
+  }
+
+  @Test
+  public void testOperationOnAccountIdle() throws Exception {
+    //Get the filesystem.
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsConfiguration configuration1 = 
fs.getAbfsStore().getClient().getAbfsConfiguration();
+    Assume.assumeTrue(configuration1.isAccountThrottlingEnabled());
+
+    AbfsClientThrottlingIntercept accountIntercept = 
AbfsClientThrottlingInterceptFactory.getInstance(this.getAccountName(),
+            configuration1);
+    int bufferSize = MIN_BUFFER_SIZE;
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    Path testPath = path(TEST_PATH);
+
+    //Do an operation on the filesystem.
+    FSDataOutputStream stream = fs.create(testPath);
+
+    try {
+      stream.write(b);
+    } finally{
+      stream.close();
+    }
+
+    //Don't perform any operation on the account.
+    int sleepTime = (int)((getAbfsConfig().getAccountOperationIdleTimeout()) * 
1.5);
+    Thread.sleep(sleepTime);
+
+    FSDataInputStream streamRead = fs.open(testPath);
+    try {
+      streamRead.read(b);
+    } finally{
+      streamRead.close();
+    }
+
+    //Perform operations on another account.
+    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+    Configuration config = new Configuration(getRawConfiguration());
+    String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
+    final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + 
accountName1;
+    URI defaultUri1 = null;
+    try {
+      defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
+    } catch (Exception ex) {

Review Comment:
   no need to wrap as the test throws Exception.





> ABFS: Support for account level throttling
> ------------------------------------------
>
>                 Key: HADOOP-18457
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18457
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.3.4
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>
> To add support for throttling at account level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to