saxenapranav commented on code in PR #5446:
URL: https://github.com/apache/hadoop/pull/5446#discussion_r1125941455


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -50,6 +50,14 @@ public final class FileSystemConfigurations {
   public static final int 
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF 
= 2;
 
+  // Throttling Analysis defaults.
+  public static final double DEFAULT_MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;

Review Comment:
   nit:`0.1`



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
     Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
   }
 
+//  public void testClientBackoffOnlyNewRequest() throws IOException {
+  @Test
+  public void testClientBackoffOnlyNewWriteRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = HTTP_METHOD_PUT;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+    writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+  }

Review Comment:
   what is happening in throttlingIntercept is out of scope for the pr.  its 
not testing what the change we want to have. What its touching is that if the 
if-else block is working correct or not. even if we want to to test it, lets 
have intercept mocked and just check if `intercept.sendingRequest` is being 
called or not.
   What i want to say is that, lets not get into whats happening inside 
throttlingIntercept.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -50,6 +50,14 @@ public final class FileSystemConfigurations {
   public static final int 
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF 
= 2;
 
+  // Throttling Analysis defaults.
+  public static final double DEFAULT_MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;

Review Comment:
   please check other double values in default configs added.
   nit comment.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -334,6 +337,25 @@ private boolean executeHttpOperation(final int retryCount,
     return true;
   }
 
+  /**
+   * Makes a call for client side throttling based on
+   * the request count.
+   * @param operationType operation type of current request
+   * @param abfsCounters AbfsCounters instance
+   */
+  @VisibleForTesting
+  boolean applyThrottlingBackoff(int retryCount, AbfsRestOperationType 
operationType, AbfsCounters abfsCounters) {
+    if (retryCount == 0) {
+      intercept.sendingRequest(operationType, abfsCounters);
+      return true;
+    }
+    return false;
+  }
+
+  public AbfsHttpOperation createHttpOperationInstance() throws IOException {

Review Comment:
   dont have it public. have it like:
   ```
    AbfsHttpOperation createHttpOperationInstance() throws IOException {
   ```



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -268,10 +269,55 @@
       DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
   private int accountOperationIdleTimeout;
 
+  /*
+  Analysis Period for client-side throttling
+   */
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ANALYSIS_PERIOD,
           DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
   private int analysisPeriod;
 
+  /*

Review Comment:
   lets have it in javadocs?



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
     Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
   }
 
+//  public void testClientBackoffOnlyNewRequest() throws IOException {

Review Comment:
   nit: lets remove this line.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
     Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
   }
 
+//  public void testClientBackoffOnlyNewRequest() throws IOException {
+  @Test
+  public void testClientBackoffOnlyNewWriteRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = HTTP_METHOD_PUT;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+    writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+  }
+
+  @Test
+  public void testClientBackoffOnlyNewReadRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long readThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+    readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);

Review Comment:
   same comment as above.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
     Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
   }
 
+//  public void testClientBackoffOnlyNewRequest() throws IOException {
+  @Test
+  public void testClientBackoffOnlyNewWriteRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = HTTP_METHOD_PUT;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+    writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+  }
+
+  @Test
+  public void testClientBackoffOnlyNewReadRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long readThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+    readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    readThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    assertEquals(readThrottleStatBefore, readThrottleStatAfter);
+  }
+
+  @Test
+  public void testReadThrottleNewRequest() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient());
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    AbfsThrottlingIntercept intercept = 
Mockito.mock(AbfsThrottlingIntercept.class);
+    
Mockito.doNothing().when(intercept).sendingRequest(Mockito.any(AbfsRestOperationType.class),
 Mockito.any(AbfsCounters.class));
+    Mockito.doReturn(intercept).when(client).getIntercept();
+
+    // setting up the spy AbfsRestOperation class for read
+    final List<AbfsHttpHeader> requestHeaders = client.createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
client.createDefaultUriQueryBuilder();
+
+    final URL url = client.createRequestUrl("/dummyReadFile", 
abfsUriQueryBuilder.toString());
+    final AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation(
+            AbfsRestOperationType.ReadFile,
+            client,
+            HTTP_METHOD_GET,
+            url,
+            requestHeaders));
+
+    // setting up mock behavior for the AbfsHttpOperation class
+    AbfsHttpOperation mockHttpOp = 
Mockito.spy(mockRestOp.createHttpOperationInstance());
+    Mockito.doReturn(-1)
+            .doReturn(-1)
+            .doReturn(-1)
+            .doReturn(HTTP_OK)
+            .when(mockHttpOp).getStatusCode();
+    
Mockito.doNothing().when(mockHttpOp).setRequestProperty(nullable(String.class), 
nullable(String.class));
+    Mockito.doNothing().when(mockHttpOp).sendRequest(nullable(byte[].class), 
nullable(int.class), nullable(int.class));
+    
Mockito.doNothing().when(mockHttpOp).processResponse(nullable(byte[].class), 
nullable(int.class), nullable(int.class));
+
+    
Mockito.doReturn(mockHttpOp).when(mockRestOp).createHttpOperationInstance();
+    Mockito.doReturn(mockHttpOp).when(mockRestOp).getResult();
+
+    mockRestOp.execute(getTestTracingContext(fs, false));
+    Mockito.verify(intercept, 
times(1)).sendingRequest(Mockito.any(AbfsRestOperationType.class), 
Mockito.any(AbfsCounters.class));

Review Comment:
   since you plan to keep `applyThrottlingBackoff` in your code, let check if 
`applyThrottlingBackoff` is being called 4 times. it  would be great addition. 
What you feel?



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
     Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
   }
 
+//  public void testClientBackoffOnlyNewRequest() throws IOException {
+  @Test
+  public void testClientBackoffOnlyNewWriteRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = HTTP_METHOD_PUT;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+    writeThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    writeThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+    assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+  }
+
+  @Test
+  public void testClientBackoffOnlyNewReadRequest() throws IOException, 
InterruptedException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = fs.getAbfsStore().getClient();
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    URL dummyUrl = client.createRequestUrl("/", "");
+    String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+    AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+    AbfsRestOperation restOp = new AbfsRestOperation(testOperationType, 
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+    Long readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    Thread.sleep(10000);
+    boolean appliedBackoff = restOp.applyThrottlingBackoff(0, 
testOperationType, counters);
+    assertEquals(true, appliedBackoff);
+    Long readThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+    readThrottleStatBefore = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType, 
counters);
+    assertEquals(false, appliedBackoff);
+    readThrottleStatAfter = 
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+    assertEquals(readThrottleStatBefore, readThrottleStatAfter);
+  }
+
+  @Test
+  public void testReadThrottleNewRequest() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient());
+    AbfsConfiguration configuration = client.getAbfsConfiguration();
+    Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+    AbfsCounters counters = client.getAbfsCounters();
+
+    AbfsThrottlingIntercept intercept = 
Mockito.mock(AbfsThrottlingIntercept.class);
+    
Mockito.doNothing().when(intercept).sendingRequest(Mockito.any(AbfsRestOperationType.class),
 Mockito.any(AbfsCounters.class));
+    Mockito.doReturn(intercept).when(client).getIntercept();
+
+    // setting up the spy AbfsRestOperation class for read
+    final List<AbfsHttpHeader> requestHeaders = client.createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
client.createDefaultUriQueryBuilder();
+
+    final URL url = client.createRequestUrl("/dummyReadFile", 
abfsUriQueryBuilder.toString());
+    final AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation(
+            AbfsRestOperationType.ReadFile,
+            client,
+            HTTP_METHOD_GET,
+            url,
+            requestHeaders));
+
+    // setting up mock behavior for the AbfsHttpOperation class
+    AbfsHttpOperation mockHttpOp = 
Mockito.spy(mockRestOp.createHttpOperationInstance());
+    Mockito.doReturn(-1)
+            .doReturn(-1)
+            .doReturn(-1)
+            .doReturn(HTTP_OK)
+            .when(mockHttpOp).getStatusCode();
+    
Mockito.doNothing().when(mockHttpOp).setRequestProperty(nullable(String.class), 
nullable(String.class));
+    Mockito.doNothing().when(mockHttpOp).sendRequest(nullable(byte[].class), 
nullable(int.class), nullable(int.class));
+    
Mockito.doNothing().when(mockHttpOp).processResponse(nullable(byte[].class), 
nullable(int.class), nullable(int.class));
+
+    
Mockito.doReturn(mockHttpOp).when(mockRestOp).createHttpOperationInstance();
+    Mockito.doReturn(mockHttpOp).when(mockRestOp).getResult();
+
+    mockRestOp.execute(getTestTracingContext(fs, false));
+    Mockito.verify(intercept, 
times(1)).sendingRequest(Mockito.any(AbfsRestOperationType.class), 
Mockito.any(AbfsCounters.class));
+  }
+
+  @Test
+  public void testWriteThrottleNewRequest() throws IOException {

Review Comment:
   this is not different than `testReadThrottleNewRequest`, just that api is 
changing. we can remove it, since its not adding any significance. What you 
feel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to