[ 
https://issues.apache.org/jira/browse/HADOOP-18146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651093#comment-17651093
 ] 

ASF GitHub Bot commented on HADOOP-18146:
-----------------------------------------

pranavsaxena-microsoft commented on code in PR #4039:
URL: https://github.com/apache/hadoop/pull/4039#discussion_r1055102136


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,36 @@ public AbfsRestOperation append(final String path, final 
byte[] buffer,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = new AbfsRestOperation(
-        AbfsRestOperationType.Append,
-        this,
-        HTTP_METHOD_PUT,
-        url,
-        requestHeaders,
-        buffer,
-        reqParams.getoffset(),
-        reqParams.getLength(),
-        sasTokenForReuse);
+    final AbfsRestOperation op = 
getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+        HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),
+        reqParams.getLength(), sasTokenForReuse);
     try {
       op.execute(tracingContext);
     } catch (AzureBlobFileSystemException e) {
+      // If the http response code indicates a user error we retry
+      // the same append request with expect header disabled.
+      // When "100-continue" header is enabled but a non Http 100 response 
comes,
+      // JDK fails to provide all response headers.

Review Comment:
   Kindly elaborate / cite-resource for `JDK fails to provide all response 
headers.`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int 
length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct 
status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, 
returning back "
+                  + ExceptionUtils.getStackTrace(e));

Review Comment:
   Log.debug("message", exceptionObj) will do the same thing of printing the 
stacktrace.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -229,14 +229,23 @@ private void completeExecute(TracingContext 
tracingContext)
       }
     }
 
-    if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {

Review Comment:
   Please add comment on why exception is being thrown for status < 200.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int 
length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct 
status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, 
returning back "
+                  + ExceptionUtils.getStackTrace(e));
+          return;

Review Comment:
   After sendRequest method gets completed, AbfsRestOperation will trigger 
httpOperation.processResponse. If we have not send the request, what are we 
expecting from the connection.getInputStream, connection.getResponseCode, 
connection.getHeaderField.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -315,12 +324,20 @@ private boolean executeHttpOperation(final int retryCount,
       }
 
       if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
-        throw new InvalidAbfsRestOperationException(ex);
+        throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
 
       return false;
     } finally {
-      intercept.updateMetrics(operationType, httpOperation);
+      int status = httpOperation.getStatusCode();
+      // If the socket is terminated prior to receiving a response, the HTTP

Review Comment:
   Please cite the resource for http status being 0 or -1.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java:
##########
@@ -30,14 +30,33 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class InvalidAbfsRestOperationException extends 
AbfsRestOperationException {
+
+  private static final String ERROR_MESSAGE = 
"InvalidAbfsRestOperationException";
+
   public InvalidAbfsRestOperationException(
       final Exception innerException) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
         AzureServiceErrorCode.UNKNOWN.getErrorCode(),
         innerException != null
             ? innerException.toString()
-            : "InvalidAbfsRestOperationException",
+            : ERROR_MESSAGE,

Review Comment:
   Should we have this.getClass().getName()?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int 
length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct 
status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, 
returning back "
+                  + ExceptionUtils.getStackTrace(e));
+          return;
+        } else {
+          LOG.debug("Getting output stream failed without expect header 
enabled, throwing exception "
+                  + ExceptionUtils.getStackTrace(e));

Review Comment:
   Log.debug("message", exceptionObj) will do the same thing of printing the 
stacktrace.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java:
##########
@@ -404,4 +431,152 @@ public static AbfsRestOperation 
getRestOp(AbfsRestOperationType type,
   public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
     return client.getTokenProvider();
   }
+
+  /**
+   * Test helper method to get random bytes array.
+   * @param length The length of byte buffer.
+   * @return byte buffer.
+   */
+  private byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  /**
+   * Test to verify that client retries append request without
+   * expect header enabled if append with expect header enabled fails
+   * with 4xx kind of error.
+   * @throws Exception
+   */
+  @Test
+  public void testExpectHundredContinue() throws Exception {
+    // Get the filesystem.
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+    // Update the configuration with reduced retry count and reduced backoff 
interval.
+    AbfsConfiguration abfsConfig
+        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+        abfsConfiguration,
+        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+    // Gets the client.
+    AbfsClient testClient = Mockito.spy(
+        TestAbfsClient.createTestClientFromCurrentContext(
+            abfsClient,
+            abfsConfig));
+
+    // Create the append request params with expect header enabled initially.
+    AppendRequestParameters appendRequestParameters
+        = new AppendRequestParameters(
+        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+        AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
+
+    byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
+
+    // Create a test container to upload the data.
+    Path testPath = path(TEST_PATH);
+    fs.create(testPath);
+    String finalTestPath = testPath.toString()
+        .substring(testPath.toString().lastIndexOf("/"));
+
+    // Creates a list of request headers.
+    final List<AbfsHttpHeader> requestHeaders
+        = TestAbfsClient.getTestRequestHeaders(testClient);
+    requestHeaders.add(
+        new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+    }
+
+    // Updates the query parameters.
+    final AbfsUriQueryBuilder abfsUriQueryBuilder
+        = testClient.createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+        Long.toString(appendRequestParameters.getPosition()));
+
+    // Creates the url for the specified path.
+    URL url = testClient.createRequestUrl(finalTestPath, 
abfsUriQueryBuilder.toString());
+
+    // Create a mock of the AbfsRestOperation to set the urlConnection in the 
corresponding httpOperation.
+    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.Append,
+        testClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders, buffer,
+        appendRequestParameters.getoffset(),
+        appendRequestParameters.getLength(), null));
+
+    AbfsHttpOperation abfsHttpOperation = new AbfsHttpOperation(url,
+        HTTP_METHOD_PUT, requestHeaders);
+
+    // Create a mock of UrlConnection class.
+    HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+
+    // Sets the expect request property if expect header is enabled.
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      Mockito.doReturn(HUNDRED_CONTINUE).when(urlConnection)
+          .getRequestProperty(EXPECT);
+    }
+    Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(url).when(urlConnection).getURL();
+
+    // Give user error code 404 when processResponse is called.
+    Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+    Mockito.doReturn(HTTP_NOT_FOUND).when(urlConnection).getResponseCode();
+    Mockito.doReturn("Resource Not Found")
+        .when(urlConnection)
+        .getResponseMessage();
+
+    // Make the getOutputStream throw IOException to see it returns from the 
sendRequest correctly.
+    Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+        .when(urlConnection)
+        .getOutputStream();
+    abfsHttpOperation.setConnection(urlConnection);

Review Comment:
   Lets not have a setter for connection in abfsHttpOperation. What we can do 
is:
   
   1. Make abfsHttpOperation a spied obj.
   2. Have package-protected methods in AbfsHttpOperation for 
getConnectionRequestMethod(), getConnectionResponseCode(), 
getConnectionResponseMessage() which in the obj would be calling connection 
method. But the spied object would have the mocked behaviour :
    - getConnectionResponseCode: return HTTP_NOT_FOUND.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -110,6 +115,10 @@ protected  HttpURLConnection getConnection() {
     return connection;
   }
 
+  void setConnection(final HttpURLConnection connection) {
+    this.connection = connection;
+  }
+

Review Comment:
   As suggested in TestAbfsClient, lets remove it.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java:
##########
@@ -126,7 +132,7 @@ public ExponentialRetryPolicy(final int retryCount, final 
int minBackoff, final
    */
   public boolean shouldRetry(final int retryCount, final int statusCode) {
     return retryCount < this.retryCount
-        && (statusCode == -1
+        && (statusCode < HTTP_CONTINUE

Review Comment:
   Please cite reference on why we are retrying on range of statuses.





> ABFS: Add changes for expect hundred continue header with append requests
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-18146
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18146
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.3.1
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
>  Heavy load from a Hadoop cluster lead to high resource utilization at FE 
> nodes. Investigations from the server side indicate payload buffering at 
> Http.Sys as the cause. Payload of requests that eventually fail due to 
> throttling limits are also getting buffered, as its triggered before FE could 
> start request processing.
> Approach: Client sends Append Http request with Expect header, but holds back 
> on payload transmission until server replies back with HTTP 100. We add this 
> header for all append requests so as to reduce.
> We made several workload runs with and without hundred continue enabled and 
> the overall observation is that :-
>  # The ratio of TCP SYN packet count with and without expect hundred continue 
> enabled is 0.32 : 3 on average.
>  #  The ingress into the machine at TCP level is almost 3 times lesser with 
> hundred continue enabled which implies a lot of bandwidth save.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to