[ 
https://issues.apache.org/jira/browse/HADOOP-18457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17627697#comment-17627697
 ] 

ASF GitHub Bot commented on HADOOP-18457:
-----------------------------------------

pranavsaxena-microsoft commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r1011670326


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -95,6 +95,18 @@ private AbfsClientThrottlingAnalyzer() {
         analysisPeriodMs);
   }
 
+  /**
+   * Resumes the timer if it was stopped.
+   */
+  public void resumeTimer() {

Review Comment:
   Lets have it private



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
     return sharedKeyCredentials;
   }
 
+  public AbfsThrottlingIntercept getIntercept() {

Review Comment:
   Lets make it 'default' access-type. Reason being, this can be accessed by 
end-developer and might updateMetrics in such a way that our client doesn't 
throttle in suitable manner.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.getBytesFailed().addAndGet(count);
+      metrics.getOperationsFailed().incrementAndGet();
     } else {
-      metrics.bytesSuccessful.addAndGet(count);
-      metrics.operationsSuccessful.incrementAndGet();
+      metrics.getBytesSuccessful().addAndGet(count);
+      metrics.getOperationsSuccessful().incrementAndGet();
     }
+    blobMetrics.set(metrics);
   }
 
   /**
    * Suspends the current storage operation, as necessary, to reduce 
throughput.
    * @return true if Thread sleeps(Throttling occurs) else false.
    */
   public boolean suspendIfNecessary() {
+    if (isOperationOnAccountIdle.get()) {
+      resumeTimer();
+    }

Review Comment:
   Still better approach I feel we can implement is (I just thought of it now), 
lets have a synchronized block which deals with isOperationOnAccountIdle 
(either for suspend or resume), because still race conditions can happen.  For 
ex:, 
   1. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L266
   2. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L133-L136
   3. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L267-L269



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements 
AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, 
initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all 
accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system 
using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration 
abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file 
system.");
+        }
+      } finally {
+        LOCK.unlock();
+      }
     }
+    return singleton;
   }
 
-  static void updateMetrics(AbfsRestOperationType operationType,
-                            AbfsHttpOperation abfsHttpOperation) {
-    if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+  /**
+   * Updates the metrics for successful and failed read and write operations.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsHttpOperation Used for status code and data transfeered.

Review Comment:
   nit: transferred.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements 
AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, 
initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all 
accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system 
using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration 
abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file 
system.");
+        }
+      } finally {

Review Comment:
   We can remove try , finally block.





> ABFS: Support for account level throttling
> ------------------------------------------
>
>                 Key: HADOOP-18457
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18457
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.3.4
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>
> To add support for throttling at account level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to