[
https://issues.apache.org/jira/browse/HADOOP-18640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696699#comment-17696699
]
ASF GitHub Bot commented on HADOOP-18640:
-----------------------------------------
saxenapranav commented on code in PR #5446:
URL: https://github.com/apache/hadoop/pull/5446#discussion_r1125941455
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -50,6 +50,14 @@ public final class FileSystemConfigurations {
public static final int
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF
= 2;
+ // Throttling Analysis defaults.
+ public static final double DEFAULT_MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
Review Comment:
nit:`0.1`
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
Assert.assertEquals("Delta backoff interval was not set as expected.",
expectedDeltaBackoff, policy.getDeltaBackoff());
}
+// public void testClientBackoffOnlyNewRequest() throws IOException {
+ @Test
+ public void testClientBackoffOnlyNewWriteRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = HTTP_METHOD_PUT;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+ writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+ }
Review Comment:
what is happening in throttlingIntercept is out of scope for the pr. its
not testing what the change we want to have. What its touching is that if the
if-else block is working correct or not. even if we want to to test it, lets
have intercept mocked and just check if `intercept.sendingRequest` is being
called or not.
What i want to say is that, lets not get into whats happening inside
throttlingIntercept.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -50,6 +50,14 @@ public final class FileSystemConfigurations {
public static final int
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF
= 2;
+ // Throttling Analysis defaults.
+ public static final double DEFAULT_MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
Review Comment:
please check other double values in default configs added.
nit comment.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -334,6 +337,25 @@ private boolean executeHttpOperation(final int retryCount,
return true;
}
+ /**
+ * Makes a call for client side throttling based on
+ * the request count.
+ * @param operationType operation type of current request
+ * @param abfsCounters AbfsCounters instance
+ */
+ @VisibleForTesting
+ boolean applyThrottlingBackoff(int retryCount, AbfsRestOperationType
operationType, AbfsCounters abfsCounters) {
+ if (retryCount == 0) {
+ intercept.sendingRequest(operationType, abfsCounters);
+ return true;
+ }
+ return false;
+ }
+
+ public AbfsHttpOperation createHttpOperationInstance() throws IOException {
Review Comment:
dont have it public. have it like:
```
AbfsHttpOperation createHttpOperationInstance() throws IOException {
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -268,10 +269,55 @@
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
+ /*
+ Analysis Period for client-side throttling
+ */
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ANALYSIS_PERIOD,
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
private int analysisPeriod;
+ /*
Review Comment:
lets have it in javadocs?
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
Assert.assertEquals("Delta backoff interval was not set as expected.",
expectedDeltaBackoff, policy.getDeltaBackoff());
}
+// public void testClientBackoffOnlyNewRequest() throws IOException {
Review Comment:
nit: lets remove this line.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
Assert.assertEquals("Delta backoff interval was not set as expected.",
expectedDeltaBackoff, policy.getDeltaBackoff());
}
+// public void testClientBackoffOnlyNewRequest() throws IOException {
+ @Test
+ public void testClientBackoffOnlyNewWriteRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = HTTP_METHOD_PUT;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+ writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+ }
+
+ @Test
+ public void testClientBackoffOnlyNewReadRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long readThrottleStatAfter =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+ readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
Review Comment:
same comment as above.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
Assert.assertEquals("Delta backoff interval was not set as expected.",
expectedDeltaBackoff, policy.getDeltaBackoff());
}
+// public void testClientBackoffOnlyNewRequest() throws IOException {
+ @Test
+ public void testClientBackoffOnlyNewWriteRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = HTTP_METHOD_PUT;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+ writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+ }
+
+ @Test
+ public void testClientBackoffOnlyNewReadRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long readThrottleStatAfter =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+ readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ readThrottleStatAfter =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ assertEquals(readThrottleStatBefore, readThrottleStatAfter);
+ }
+
+ @Test
+ public void testReadThrottleNewRequest() throws IOException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient());
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ AbfsThrottlingIntercept intercept =
Mockito.mock(AbfsThrottlingIntercept.class);
+
Mockito.doNothing().when(intercept).sendingRequest(Mockito.any(AbfsRestOperationType.class),
Mockito.any(AbfsCounters.class));
+ Mockito.doReturn(intercept).when(client).getIntercept();
+
+ // setting up the spy AbfsRestOperation class for read
+ final List<AbfsHttpHeader> requestHeaders = client.createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
client.createDefaultUriQueryBuilder();
+
+ final URL url = client.createRequestUrl("/dummyReadFile",
abfsUriQueryBuilder.toString());
+ final AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation(
+ AbfsRestOperationType.ReadFile,
+ client,
+ HTTP_METHOD_GET,
+ url,
+ requestHeaders));
+
+ // setting up mock behavior for the AbfsHttpOperation class
+ AbfsHttpOperation mockHttpOp =
Mockito.spy(mockRestOp.createHttpOperationInstance());
+ Mockito.doReturn(-1)
+ .doReturn(-1)
+ .doReturn(-1)
+ .doReturn(HTTP_OK)
+ .when(mockHttpOp).getStatusCode();
+
Mockito.doNothing().when(mockHttpOp).setRequestProperty(nullable(String.class),
nullable(String.class));
+ Mockito.doNothing().when(mockHttpOp).sendRequest(nullable(byte[].class),
nullable(int.class), nullable(int.class));
+
Mockito.doNothing().when(mockHttpOp).processResponse(nullable(byte[].class),
nullable(int.class), nullable(int.class));
+
+
Mockito.doReturn(mockHttpOp).when(mockRestOp).createHttpOperationInstance();
+ Mockito.doReturn(mockHttpOp).when(mockRestOp).getResult();
+
+ mockRestOp.execute(getTestTracingContext(fs, false));
+ Mockito.verify(intercept,
times(1)).sendingRequest(Mockito.any(AbfsRestOperationType.class),
Mockito.any(AbfsCounters.class));
Review Comment:
since you plan to keep `applyThrottlingBackoff` in your code, let check if
`applyThrottlingBackoff` is being called 4 times. it would be great addition.
What you feel?
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java:
##########
@@ -285,6 +299,154 @@ public void testAbfsConfigConstructor() throws Exception {
Assert.assertEquals("Delta backoff interval was not set as expected.",
expectedDeltaBackoff, policy.getDeltaBackoff());
}
+// public void testClientBackoffOnlyNewRequest() throws IOException {
+ @Test
+ public void testClientBackoffOnlyNewWriteRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = HTTP_METHOD_PUT;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.Append;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(new Long(writeThrottleStatBefore+1), writeThrottleStatAfter);
+
+
+ writeThrottleStatBefore =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ writeThrottleStatAfter =
counters.toMap().get(AbfsStatistic.WRITE_THROTTLES.getStatName());
+ assertEquals(writeThrottleStatBefore, writeThrottleStatAfter);
+ }
+
+ @Test
+ public void testClientBackoffOnlyNewReadRequest() throws IOException,
InterruptedException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = fs.getAbfsStore().getClient();
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ URL dummyUrl = client.createRequestUrl("/", "");
+ String dummyMethod = AbfsHttpConstants.HTTP_METHOD_GET;
+
+ AbfsRestOperationType testOperationType = AbfsRestOperationType.ReadFile;
+
+ AbfsRestOperation restOp = new AbfsRestOperation(testOperationType,
client, dummyMethod, dummyUrl, new ArrayList<>());
+
+ Long readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ Thread.sleep(10000);
+ boolean appliedBackoff = restOp.applyThrottlingBackoff(0,
testOperationType, counters);
+ assertEquals(true, appliedBackoff);
+ Long readThrottleStatAfter =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ assertEquals(new Long(readThrottleStatBefore+1), readThrottleStatAfter);
+
+
+ readThrottleStatBefore =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ appliedBackoff = restOp.applyThrottlingBackoff(1, testOperationType,
counters);
+ assertEquals(false, appliedBackoff);
+ readThrottleStatAfter =
counters.toMap().get(AbfsStatistic.READ_THROTTLES.getStatName());
+ assertEquals(readThrottleStatBefore, readThrottleStatAfter);
+ }
+
+ @Test
+ public void testReadThrottleNewRequest() throws IOException {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient());
+ AbfsConfiguration configuration = client.getAbfsConfiguration();
+ Assume.assumeTrue(configuration.isAutoThrottlingEnabled());
+ AbfsCounters counters = client.getAbfsCounters();
+
+ AbfsThrottlingIntercept intercept =
Mockito.mock(AbfsThrottlingIntercept.class);
+
Mockito.doNothing().when(intercept).sendingRequest(Mockito.any(AbfsRestOperationType.class),
Mockito.any(AbfsCounters.class));
+ Mockito.doReturn(intercept).when(client).getIntercept();
+
+ // setting up the spy AbfsRestOperation class for read
+ final List<AbfsHttpHeader> requestHeaders = client.createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
client.createDefaultUriQueryBuilder();
+
+ final URL url = client.createRequestUrl("/dummyReadFile",
abfsUriQueryBuilder.toString());
+ final AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation(
+ AbfsRestOperationType.ReadFile,
+ client,
+ HTTP_METHOD_GET,
+ url,
+ requestHeaders));
+
+ // setting up mock behavior for the AbfsHttpOperation class
+ AbfsHttpOperation mockHttpOp =
Mockito.spy(mockRestOp.createHttpOperationInstance());
+ Mockito.doReturn(-1)
+ .doReturn(-1)
+ .doReturn(-1)
+ .doReturn(HTTP_OK)
+ .when(mockHttpOp).getStatusCode();
+
Mockito.doNothing().when(mockHttpOp).setRequestProperty(nullable(String.class),
nullable(String.class));
+ Mockito.doNothing().when(mockHttpOp).sendRequest(nullable(byte[].class),
nullable(int.class), nullable(int.class));
+
Mockito.doNothing().when(mockHttpOp).processResponse(nullable(byte[].class),
nullable(int.class), nullable(int.class));
+
+
Mockito.doReturn(mockHttpOp).when(mockRestOp).createHttpOperationInstance();
+ Mockito.doReturn(mockHttpOp).when(mockRestOp).getResult();
+
+ mockRestOp.execute(getTestTracingContext(fs, false));
+ Mockito.verify(intercept,
times(1)).sendingRequest(Mockito.any(AbfsRestOperationType.class),
Mockito.any(AbfsCounters.class));
+ }
+
+ @Test
+ public void testWriteThrottleNewRequest() throws IOException {
Review Comment:
this is not different than `testReadThrottleNewRequest`, just that api is
changing. we can remove it, since its not adding any significance. What you
feel?
> ABFS: Enabling Client-side Backoff only for new requests
> --------------------------------------------------------
>
> Key: HADOOP-18640
> URL: https://issues.apache.org/jira/browse/HADOOP-18640
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Reporter: Sree Bhattacharyya
> Assignee: Sree Bhattacharyya
> Priority: Minor
> Labels: pull-request-available
>
> Enabling backoff only for new requests that happen, and disabling for retried
> requests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]