[
https://issues.apache.org/jira/browse/HADOOP-18146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651093#comment-17651093
]
ASF GitHub Bot commented on HADOOP-18146:
-----------------------------------------
pranavsaxena-microsoft commented on code in PR #4039:
URL: https://github.com/apache/hadoop/pull/4039#discussion_r1055102136
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,36 @@ public AbfsRestOperation append(final String path, final
byte[] buffer,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
- final AbfsRestOperation op = new AbfsRestOperation(
- AbfsRestOperationType.Append,
- this,
- HTTP_METHOD_PUT,
- url,
- requestHeaders,
- buffer,
- reqParams.getoffset(),
- reqParams.getLength(),
- sasTokenForReuse);
+ final AbfsRestOperation op =
getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+ HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),
+ reqParams.getLength(), sasTokenForReuse);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
+ // If the http response code indicates a user error we retry
+ // the same append request with expect header disabled.
+ // When "100-continue" header is enabled but a non Http 100 response
comes,
+ // JDK fails to provide all response headers.
Review Comment:
Kindly elaborate / cite-resource for `JDK fails to provide all response
headers.`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int
length) throws IOExceptio
if (this.isTraceEnabled) {
startTime = System.nanoTime();
}
- try (OutputStream outputStream = this.connection.getOutputStream()) {
- // update bytes sent before they are sent so we may observe
- // attempted sends as well as successful sends via the
- // accompanying statusCode
+ OutputStream outputStream = null;
+ // Updates the expected bytes to be sent based on length.
+ this.expectedBytesToBeSent = length;
+ try {
+ try {
+ /* Without expect header enabled, if getOutputStream() throws
+ an exception, it gets caught by the restOperation. But with
+ expect header enabled we return back without throwing an exception
+ for the correct response code processing.
+ */
+ outputStream = this.connection.getOutputStream();
+ } catch (IOException e) {
+ /* If getOutputStream fails with an exception and expect header
+ is enabled, we return back without throwing an exception to
+ the caller. The caller is responsible for setting the correct
status code.
+ If expect header is not enabled, we throw back the exception.
+ */
+ String expectHeader = this.connection.getRequestProperty(EXPECT);
+ if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+ LOG.debug("Getting output stream failed with expect header enabled,
returning back "
+ + ExceptionUtils.getStackTrace(e));
Review Comment:
Log.debug("message", exceptionObj) will do the same thing of printing the
stacktrace.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -229,14 +229,23 @@ private void completeExecute(TracingContext
tracingContext)
}
}
- if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
Review Comment:
Please add comment on why exception is being thrown for status < 200.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int
length) throws IOExceptio
if (this.isTraceEnabled) {
startTime = System.nanoTime();
}
- try (OutputStream outputStream = this.connection.getOutputStream()) {
- // update bytes sent before they are sent so we may observe
- // attempted sends as well as successful sends via the
- // accompanying statusCode
+ OutputStream outputStream = null;
+ // Updates the expected bytes to be sent based on length.
+ this.expectedBytesToBeSent = length;
+ try {
+ try {
+ /* Without expect header enabled, if getOutputStream() throws
+ an exception, it gets caught by the restOperation. But with
+ expect header enabled we return back without throwing an exception
+ for the correct response code processing.
+ */
+ outputStream = this.connection.getOutputStream();
+ } catch (IOException e) {
+ /* If getOutputStream fails with an exception and expect header
+ is enabled, we return back without throwing an exception to
+ the caller. The caller is responsible for setting the correct
status code.
+ If expect header is not enabled, we throw back the exception.
+ */
+ String expectHeader = this.connection.getRequestProperty(EXPECT);
+ if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+ LOG.debug("Getting output stream failed with expect header enabled,
returning back "
+ + ExceptionUtils.getStackTrace(e));
+ return;
Review Comment:
After sendRequest method gets completed, AbfsRestOperation will trigger
httpOperation.processResponse. If we have not send the request, what are we
expecting from the connection.getInputStream, connection.getResponseCode,
connection.getHeaderField.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -315,12 +324,20 @@ private boolean executeHttpOperation(final int retryCount,
}
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
- throw new InvalidAbfsRestOperationException(ex);
+ throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
} finally {
- intercept.updateMetrics(operationType, httpOperation);
+ int status = httpOperation.getStatusCode();
+ // If the socket is terminated prior to receiving a response, the HTTP
Review Comment:
Please cite the resource for http status being 0 or -1.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java:
##########
@@ -30,14 +30,33 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidAbfsRestOperationException extends
AbfsRestOperationException {
+
+ private static final String ERROR_MESSAGE =
"InvalidAbfsRestOperationException";
+
public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
- : "InvalidAbfsRestOperationException",
+ : ERROR_MESSAGE,
Review Comment:
Should we have this.getClass().getName()?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int
length) throws IOExceptio
if (this.isTraceEnabled) {
startTime = System.nanoTime();
}
- try (OutputStream outputStream = this.connection.getOutputStream()) {
- // update bytes sent before they are sent so we may observe
- // attempted sends as well as successful sends via the
- // accompanying statusCode
+ OutputStream outputStream = null;
+ // Updates the expected bytes to be sent based on length.
+ this.expectedBytesToBeSent = length;
+ try {
+ try {
+ /* Without expect header enabled, if getOutputStream() throws
+ an exception, it gets caught by the restOperation. But with
+ expect header enabled we return back without throwing an exception
+ for the correct response code processing.
+ */
+ outputStream = this.connection.getOutputStream();
+ } catch (IOException e) {
+ /* If getOutputStream fails with an exception and expect header
+ is enabled, we return back without throwing an exception to
+ the caller. The caller is responsible for setting the correct
status code.
+ If expect header is not enabled, we throw back the exception.
+ */
+ String expectHeader = this.connection.getRequestProperty(EXPECT);
+ if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+ LOG.debug("Getting output stream failed with expect header enabled,
returning back "
+ + ExceptionUtils.getStackTrace(e));
+ return;
+ } else {
+ LOG.debug("Getting output stream failed without expect header
enabled, throwing exception "
+ + ExceptionUtils.getStackTrace(e));
Review Comment:
Log.debug("message", exceptionObj) will do the same thing of printing the
stacktrace.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java:
##########
@@ -404,4 +431,152 @@ public static AbfsRestOperation
getRestOp(AbfsRestOperationType type,
public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
return client.getTokenProvider();
}
+
+ /**
+ * Test helper method to get random bytes array.
+ * @param length The length of byte buffer.
+ * @return byte buffer.
+ */
+ private byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ /**
+ * Test to verify that client retries append request without
+ * expect header enabled if append with expect header enabled fails
+ * with 4xx kind of error.
+ * @throws Exception
+ */
+ @Test
+ public void testExpectHundredContinue() throws Exception {
+ // Get the filesystem.
+ final AzureBlobFileSystem fs = getFileSystem();
+
+ final Configuration configuration = new Configuration();
+ configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+ AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+ AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+ configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+ // Update the configuration with reduced retry count and reduced backoff
interval.
+ AbfsConfiguration abfsConfig
+ = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+ abfsConfiguration,
+ REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+ // Gets the client.
+ AbfsClient testClient = Mockito.spy(
+ TestAbfsClient.createTestClientFromCurrentContext(
+ abfsClient,
+ abfsConfig));
+
+ // Create the append request params with expect header enabled initially.
+ AppendRequestParameters appendRequestParameters
+ = new AppendRequestParameters(
+ BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+ AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
+
+ byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
+
+ // Create a test container to upload the data.
+ Path testPath = path(TEST_PATH);
+ fs.create(testPath);
+ String finalTestPath = testPath.toString()
+ .substring(testPath.toString().lastIndexOf("/"));
+
+ // Creates a list of request headers.
+ final List<AbfsHttpHeader> requestHeaders
+ = TestAbfsClient.getTestRequestHeaders(testClient);
+ requestHeaders.add(
+ new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
+ if (appendRequestParameters.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
+
+ // Updates the query parameters.
+ final AbfsUriQueryBuilder abfsUriQueryBuilder
+ = testClient.createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+ Long.toString(appendRequestParameters.getPosition()));
+
+ // Creates the url for the specified path.
+ URL url = testClient.createRequestUrl(finalTestPath,
abfsUriQueryBuilder.toString());
+
+ // Create a mock of the AbfsRestOperation to set the urlConnection in the
corresponding httpOperation.
+ AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+ AbfsRestOperationType.Append,
+ testClient,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders, buffer,
+ appendRequestParameters.getoffset(),
+ appendRequestParameters.getLength(), null));
+
+ AbfsHttpOperation abfsHttpOperation = new AbfsHttpOperation(url,
+ HTTP_METHOD_PUT, requestHeaders);
+
+ // Create a mock of UrlConnection class.
+ HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+
+ // Sets the expect request property if expect header is enabled.
+ if (appendRequestParameters.isExpectHeaderEnabled()) {
+ Mockito.doReturn(HUNDRED_CONTINUE).when(urlConnection)
+ .getRequestProperty(EXPECT);
+ }
+ Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+ .any(), Mockito.any());
+ Mockito.doReturn(url).when(urlConnection).getURL();
+
+ // Give user error code 404 when processResponse is called.
+ Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+ Mockito.doReturn(HTTP_NOT_FOUND).when(urlConnection).getResponseCode();
+ Mockito.doReturn("Resource Not Found")
+ .when(urlConnection)
+ .getResponseMessage();
+
+ // Make the getOutputStream throw IOException to see it returns from the
sendRequest correctly.
+ Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+ .when(urlConnection)
+ .getOutputStream();
+ abfsHttpOperation.setConnection(urlConnection);
Review Comment:
Lets not have a setter for connection in abfsHttpOperation. What we can do
is:
1. Make abfsHttpOperation a spied obj.
2. Have package-protected methods in AbfsHttpOperation for
getConnectionRequestMethod(), getConnectionResponseCode(),
getConnectionResponseMessage() which in the obj would be calling connection
method. But the spied object would have the mocked behaviour :
- getConnectionResponseCode: return HTTP_NOT_FOUND.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -110,6 +115,10 @@ protected HttpURLConnection getConnection() {
return connection;
}
+ void setConnection(final HttpURLConnection connection) {
+ this.connection = connection;
+ }
+
Review Comment:
As suggested in TestAbfsClient, lets remove it.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java:
##########
@@ -126,7 +132,7 @@ public ExponentialRetryPolicy(final int retryCount, final
int minBackoff, final
*/
public boolean shouldRetry(final int retryCount, final int statusCode) {
return retryCount < this.retryCount
- && (statusCode == -1
+ && (statusCode < HTTP_CONTINUE
Review Comment:
Please cite reference on why we are retrying on range of statuses.
> ABFS: Add changes for expect hundred continue header with append requests
> -------------------------------------------------------------------------
>
> Key: HADOOP-18146
> URL: https://issues.apache.org/jira/browse/HADOOP-18146
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.3.1
> Reporter: Anmol Asrani
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Heavy load from a Hadoop cluster lead to high resource utilization at FE
> nodes. Investigations from the server side indicate payload buffering at
> Http.Sys as the cause. Payload of requests that eventually fail due to
> throttling limits are also getting buffered, as its triggered before FE could
> start request processing.
> Approach: Client sends Append Http request with Expect header, but holds back
> on payload transmission until server replies back with HTTP 100. We add this
> header for all append requests so as to reduce.
> We made several workload runs with and without hundred continue enabled and
> the overall observation is that :-
> # The ratio of TCP SYN packet count with and without expect hundred continue
> enabled is 0.32 : 3 on average.
> # The ingress into the machine at TCP level is almost 3 times lesser with
> hundred continue enabled which implies a lot of bandwidth save.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]