This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c7285804fdee HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: 
prevent multiple server calls (#6022)
c7285804fdee is described below

commit c7285804fdeedd0f2003084ea1f73077c6fefdc1
Author: Pranav Saxena <108325433+saxenapra...@users.noreply.github.com>
AuthorDate: Thu Jan 25 14:24:01 2024 -0800

    HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: prevent multiple 
server calls (#6022)
    
    
    
    Address JDK bug JDK-8314978 related to handling of HTTP 100
    responses.
    
    https://bugs.openjdk.org/browse/JDK-8314978
    
    In the AbfsHttpOperation, after sendRequest() we call processResponse()
    method from AbfsRestOperation.
    Even if the conn.getOutputStream() fails due to expect-100 error,
    we consume the exception and let the code go ahead.
    This may call getHeaderField() / getHeaderFields() / getHeaderFieldLong() 
after
    getOutputStream() has failed. These invocation all lead to server calls.
    
    This commit aims to prevent this.
    If connection.getOutputStream() fails due to an Expect-100 error,
    the ABFS client does not invoke getHeaderField(), getHeaderFields(),
    getHeaderFieldLong() or getInputStream().
    
    getResponseCode() is safe as on the failure it sets the
    responseCode variable in HttpUrlConnection object.
    
    Contributed by Pranav Saxena
---
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |  1 +
 .../fs/azurebfs/services/AbfsHttpOperation.java    | 46 +++++++++++++---
 .../fs/azurebfs/services/AbfsOutputStream.java     |  9 +++-
 .../fs/azurebfs/services/ITestAbfsClient.java      |  3 +-
 .../azurebfs/services/ITestAbfsOutputStream.java   | 61 ++++++++++++++++++++++
 .../azurebfs/services/ITestAbfsRestOperation.java  |  3 +-
 6 files changed, 112 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 91f6bddcc1d4..63de71eb178d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -69,6 +69,7 @@ public final class AbfsHttpConstants {
    * and should qualify for retry.
    */
   public static final int HTTP_CONTINUE = 100;
+  public static final String EXPECT_100_JDK_ERROR = "Server rejected 
operation";
 
   // Abfs generic constants
   public static final String SINGLE_WHITE_SPACE = " ";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index a47720ab6972..9a4acfdccff1 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -22,12 +22,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.net.ProtocolException;
 import java.net.URL;
 import java.util.List;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
 
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
@@ -35,13 +40,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
 
@@ -84,6 +88,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
   private long sendRequestTimeMs;
   private long recvResponseTimeMs;
   private boolean shouldMask = false;
+  private boolean connectionDisconnectedOnError = false;
 
   public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
       final URL url,
@@ -333,14 +338,26 @@ public class AbfsHttpOperation implements 
AbfsPerfLoggable {
          */
         outputStream = getConnOutputStream();
       } catch (IOException e) {
-        /* If getOutputStream fails with an exception and expect header
-           is enabled, we return back without throwing an exception to
-           the caller. The caller is responsible for setting the correct 
status code.
-           If expect header is not enabled, we throw back the exception.
+        connectionDisconnectedOnError = true;
+        /* If getOutputStream fails with an expect-100 exception , we return 
back
+           without throwing an exception to the caller. Else, we throw back 
the exception.
          */
         String expectHeader = getConnProperty(EXPECT);
-        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
+            && e instanceof ProtocolException
+            && EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
           LOG.debug("Getting output stream failed with expect header enabled, 
returning back ", e);
+          /*
+           * In case expect-100 assertion has failed, headers and inputStream 
should not
+           * be parsed. Reason being, conn.getHeaderField(), 
conn.getHeaderFields(),
+           * conn.getInputStream() will lead to repeated server call.
+           * ref: https://bugs.openjdk.org/browse/JDK-8314978.
+           * Reading conn.responseCode() and conn.getResponseMessage() is safe 
in
+           * case of Expect-100 error. Reason being, in JDK, it stores the 
responseCode
+           * in the HttpUrlConnection object before throwing exception to the 
caller.
+           */
+          this.statusCode = getConnResponseCode();
+          this.statusDescription = getConnResponseMessage();
           return;
         } else {
           LOG.debug("Getting output stream failed without expect header 
enabled, throwing exception ", e);
@@ -375,7 +392,17 @@ public class AbfsHttpOperation implements AbfsPerfLoggable 
{
    * @throws IOException if an error occurs.
    */
   public void processResponse(final byte[] buffer, final int offset, final int 
length) throws IOException {
+    if (connectionDisconnectedOnError) {
+      LOG.debug("This connection was not successful or has been disconnected, "
+          + "hence not parsing headers and inputStream");
+      return;
+    }
+    processConnHeadersAndInputStreams(buffer, offset, length);
+  }
 
+  void processConnHeadersAndInputStreams(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
     // get the response
     long startTime = 0;
     if (this.isTraceEnabled) {
@@ -633,6 +660,11 @@ public class AbfsHttpOperation implements AbfsPerfLoggable 
{
     return connection.getResponseMessage();
   }
 
+  @VisibleForTesting
+  Boolean getConnectionDisconnectedOnError() {
+    return connectionDisconnectedOnError;
+  }
+
   public static class AbfsHttpOperationWithFixedResult extends 
AbfsHttpOperation {
     /**
      * Creates an instance to represent fixed results.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 67370d7a002b..f9b59df672e0 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -338,7 +338,7 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = client.append(path,
+            AbfsRestOperation op = getClient().append(path,
                 blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
                 contextEncryptionAdapter, new TracingContext(tracingContext));
             cachedSasToken.update(op.getSasToken());
@@ -655,7 +655,7 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable,
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData,
+      AbfsRestOperation op = getClient().flush(path, offset, 
retainUncommitedData,
           isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
           new TracingContext(tracingContext));
       cachedSasToken.update(op.getSasToken());
@@ -795,4 +795,9 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable,
   ListeningExecutorService getExecutorService() {
     return executorService;
   }
+
+  @VisibleForTesting
+  AbfsClient getClient() {
+    return client;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 195320d58546..e688487a110e 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -48,6 +48,7 @@ import 
org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@@ -549,7 +550,7 @@ public final class ITestAbfsClient extends 
AbstractAbfsIntegrationTest {
         .getConnResponseMessage();
 
     // Make the getOutputStream throw IOException to see it returns from the 
sendRequest correctly.
-    Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+    Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
         .when(abfsHttpOperation)
         .getConnOutputStream();
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index eee0c177c33b..de804245da7c 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -18,15 +18,19 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
@@ -34,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.test.LambdaTestUtils;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;
+
 /**
  * Test create operation.
  */
@@ -148,6 +154,61 @@ public class ITestAbfsOutputStream extends 
AbstractAbfsIntegrationTest {
     }
   }
 
+  @Test
+  public void testExpect100ContinueFailureInAppend() throws Exception {
+    Configuration configuration = new Configuration(getRawConfiguration());
+    configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
+    AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+        configuration);
+    Path path = new Path("/testFile");
+    AbfsOutputStream os = Mockito.spy(
+        (AbfsOutputStream) fs.create(path).getWrappedStream());
+    AbfsClient spiedClient = Mockito.spy(os.getClient());
+    AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
+    mockSetupForAppend(httpOpForAppendTest, spiedClient);
+    Mockito.doReturn(spiedClient).when(os).getClient();
+    fs.delete(path, true);
+    os.write(1);
+    LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
+      os.close();
+    });
+    
Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
+        .describedAs("First try from AbfsClient will have expect-100 "
+            + "header and should fail with expect-100 error.").isTrue();
+    Mockito.verify(httpOpForAppendTest[0], Mockito.times(0))
+        .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
+            Mockito.anyInt(), Mockito.anyInt());
+
+    
Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
+        .describedAs("The retried operation from AbfsClient should not "
+            + "fail with expect-100 error. The retried operation does not have"
+            + "expect-100 header.").isFalse();
+    Mockito.verify(httpOpForAppendTest[1], Mockito.times(1))
+        .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
+            Mockito.anyInt(), Mockito.anyInt());
+  }
+
+  private void mockSetupForAppend(final AbfsHttpOperation[] 
httpOpForAppendTest,
+      final AbfsClient spiedClient) {
+    int[] index = new int[1];
+    index[0] = 0;
+    Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
+          AbfsRestOperation op = Mockito.spy(
+              (AbfsRestOperation) 
abfsRestOpAppendGetInvocation.callRealMethod());
+          Mockito.doAnswer(createHttpOpInvocation -> {
+            httpOpForAppendTest[index[0]] = Mockito.spy(
+                (AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
+            return httpOpForAppendTest[index[0]++];
+          }).when(op).createHttpOperation();
+          return op;
+        })
+        .when(spiedClient)
+        
.getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class),
+            Mockito.anyString(), Mockito.any(
+                URL.class), Mockito.anyList(), Mockito.any(byte[].class),
+            Mockito.anyInt(), Mockito.anyInt(), 
Mockito.nullable(String.class));
+  }
+
   /**
    * Separate method to create an outputStream using a local FS instance so
    * that once this method has returned, the FS instance can be eligible for 
GC.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
index 6ffe2e2773bb..1532e74ac10d 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
@@ -49,6 +49,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@@ -232,7 +233,7 @@ public class ITestAbfsRestOperation extends 
AbstractAbfsIntegrationTest {
       Mockito.doReturn(responseMessage)
           .when(abfsHttpOperation)
           .getConnResponseMessage();
-      Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+      Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
           .when(abfsHttpOperation)
           .getConnOutputStream();
       break;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to