pranavsaxena-microsoft commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r1011670326


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -95,6 +95,18 @@ private AbfsClientThrottlingAnalyzer() {
         analysisPeriodMs);
   }
 
+  /**
+   * Resumes the timer if it was stopped.
+   */
+  public void resumeTimer() {

Review Comment:
   Lets have it private



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
     return sharedKeyCredentials;
   }
 
+  public AbfsThrottlingIntercept getIntercept() {

Review Comment:
   Lets make it 'default' access-type. Reason being, this can be accessed by 
end-developer and might updateMetrics in such a way that our client doesn't 
throttle in suitable manner.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.getBytesFailed().addAndGet(count);
+      metrics.getOperationsFailed().incrementAndGet();
     } else {
-      metrics.bytesSuccessful.addAndGet(count);
-      metrics.operationsSuccessful.incrementAndGet();
+      metrics.getBytesSuccessful().addAndGet(count);
+      metrics.getOperationsSuccessful().incrementAndGet();
     }
+    blobMetrics.set(metrics);
   }
 
   /**
    * Suspends the current storage operation, as necessary, to reduce 
throughput.
    * @return true if Thread sleeps(Throttling occurs) else false.
    */
   public boolean suspendIfNecessary() {
+    if (isOperationOnAccountIdle.get()) {
+      resumeTimer();
+    }

Review Comment:
   Still better approach I feel we can implement is (I just thought of it now), 
lets have a synchronized block which deals with isOperationOnAccountIdle 
(either for suspend or resume), because still race conditions can happen.  For 
ex:, 
   1. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L266
   2. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L133-L136
   3. 
https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L267-L269



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements 
AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, 
initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all 
accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system 
using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration 
abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file 
system.");
+        }
+      } finally {
+        LOCK.unlock();
+      }
     }
+    return singleton;
   }
 
-  static void updateMetrics(AbfsRestOperationType operationType,
-                            AbfsHttpOperation abfsHttpOperation) {
-    if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+  /**
+   * Updates the metrics for successful and failed read and write operations.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsHttpOperation Used for status code and data transfeered.

Review Comment:
   nit: transferred.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements 
AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, 
initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration 
abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, 
abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for 
the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean 
enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all 
accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system 
using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, 
AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration 
abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file 
system.");
+        }
+      } finally {

Review Comment:
   We can remove try , finally block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to