This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0dac3d20503 HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path 
Result Should Happen Inside Retry Loop (#7582)
0dac3d20503 is described below

commit 0dac3d20503b34483564c235bba76a2ba97b3800
Author: Anuj Modi <anujmodi2...@gmail.com>
AuthorDate: Tue Apr 8 12:59:21 2025 +0530

    HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path Result Should Happen 
Inside Retry Loop (#7582)
    
    Contributed by Anuj Modi
    Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi
    
    Signed off by: Anuj Modi<anujm...@apache.org>
---
 .../fs/azurebfs/services/AbfsAHCHttpOperation.java | 28 ++++++++++----
 .../fs/azurebfs/services/AbfsBlobClient.java       | 31 +++++++--------
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 45 ++++++++++++----------
 .../fs/azurebfs/services/AbfsHttpOperation.java    | 33 ++++++++++++++--
 .../ITestAzureBlobFileSystemListStatus.java        | 38 +++++++++++++++++-
 .../services/ITestApacheClientConnectionPool.java  |  2 +
 .../fs/azurebfs/services/TestAbfsClient.java       |  1 +
 7 files changed, 129 insertions(+), 49 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
index 2569c6a4bd3..cc460a24af6 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
@@ -38,6 +38,7 @@
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
@@ -47,6 +48,7 @@
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
@@ -192,14 +194,26 @@ String getConnResponseMessage() throws IOException {
   public void processResponse(final byte[] buffer,
       final int offset,
       final int length) throws IOException {
-    if (!isPayloadRequest) {
-      prepareRequest();
-      LOG.debug("Sending request: {}", httpRequestBase);
-      httpResponse = executeRequest();
-      LOG.debug("Request sent: {}; response {}", httpRequestBase,
-          httpResponse);
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        LOG.debug("Sending request: {}", httpRequestBase);
+        httpResponse = executeRequest();
+        LOG.debug("Request sent: {}; response {}", httpRequestBase,
+            httpResponse);
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
     }
-    parseResponseHeaderAndBody(buffer, offset, length);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 32eba86774a..c41df10d425 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1605,12 +1605,9 @@ public Hashtable<String, String> 
getXMSProperties(AbfsHttpOperation result)
   @Override
   public ListResponseData parseListPathResults(AbfsHttpOperation result, URI 
uri)
       throws AzureBlobFileSystemException {
-    BlobListResultSchema listResultSchema;
     try (InputStream stream = result.getListResultStream()) {
-      if (stream == null) {
-        return null;
-      }
       try {
+        BlobListResultSchema listResultSchema;
         final SAXParser saxParser = saxParserThreadLocal.get();
         saxParser.reset();
         listResultSchema = new BlobListResultSchema();
@@ -1620,19 +1617,17 @@ public ListResponseData 
parseListPathResults(AbfsHttpOperation result, URI uri)
         LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
             listResultSchema.paths().size(),
             listResultSchema.getNextMarker());
-      } catch (SAXException | IOException e) {
-        throw new AbfsDriverException(e);
+        return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, 
uri);
+      } catch (SAXException | IOException ex) {
+        throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
       }
-    } catch (IOException e) {
-      LOG.error("Unable to deserialize list results for uri {}", 
uri.toString(), e);
-      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
-    }
-
-    try {
-      return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, 
uri);
-    } catch (IOException e) {
-      LOG.error("Unable to filter list results for uri {}", uri.toString(), e);
-      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
+    } catch (AbfsDriverException ex) {
+      // Throw as it is to avoid multiple wrapping.
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? 
uri.toString(): "NULL", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Unable to get stream for list results for uri {}", uri != 
null ? uri.toString(): "NULL", ex);
+      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
     }
   }
 
@@ -1929,8 +1924,10 @@ private List<AbfsHttpHeader> 
getMetadataHeadersList(final Hashtable<String, Stri
    * @param listResultSchema List of entries returned by Blob Endpoint.
    * @param uri URI to be used for path conversion.
    * @return List of entries after removing duplicates.
+   * @throws IOException if path conversion fails.
    */
-  private ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
+  @VisibleForTesting
+  public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
       BlobListResultSchema listResultSchema, URI uri) throws IOException {
     List<FileStatus> fileStatuses = new ArrayList<>();
     Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index ef4194179dc..0e988162a08 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -38,8 +38,10 @@
 import java.util.UUID;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -1476,33 +1478,36 @@ public Hashtable<String, String> 
getXMSProperties(AbfsHttpOperation result)
    * @throws AzureBlobFileSystemException if parsing fails.
    */
   @Override
-  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI 
uri) throws AzureBlobFileSystemException {
-    try (InputStream listResultInputStream = result.getListResultStream()) {
-      DfsListResultSchema listResultSchema;
+  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI 
uri)
+      throws AzureBlobFileSystemException {
+    try (InputStream stream = result.getListResultStream()) {
       try {
+        DfsListResultSchema listResultSchema;
         final ObjectMapper objectMapper = new ObjectMapper();
-        listResultSchema = objectMapper.readValue(listResultInputStream,
-            DfsListResultSchema.class);
+        listResultSchema = objectMapper.readValue(stream, 
DfsListResultSchema.class);
         result.setListResultSchema(listResultSchema);
         LOG.debug("ListPath listed {} paths with {} as continuation token",
             listResultSchema.paths().size(),
             getContinuationFromResponse(result));
-      } catch (IOException ex) {
-        throw new AbfsDriverException(ex);
-      }
-
-      List<FileStatus> fileStatuses = new ArrayList<>();
-      for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
-        fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+        List<FileStatus> fileStatuses = new ArrayList<>();
+        for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
+          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+        }
+        ListResponseData listResponseData = new ListResponseData();
+        listResponseData.setFileStatusList(fileStatuses);
+        listResponseData.setRenamePendingJsonPaths(null);
+        listResponseData.setContinuationToken(
+            getContinuationFromResponse(result));
+        return listResponseData;
+      } catch (JsonParseException | JsonMappingException ex) {
+        throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
       }
-      ListResponseData listResponseData = new ListResponseData();
-      listResponseData.setFileStatusList(fileStatuses);
-      listResponseData.setRenamePendingJsonPaths(null);
-      listResponseData.setContinuationToken(
-          getContinuationFromResponse(result));
-      return listResponseData;
-    } catch (IOException ex) {
-      LOG.error("Unable to deserialize list results for Uri {}", 
uri.toString(), ex);
+    } catch (AbfsDriverException ex) {
+      // Throw as it is to avoid multiple wrapping.
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? 
uri.toString(): "NULL", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? 
uri.toString(): "NULL", ex);
       throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
     }
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 89c97b68baa..81e33c5c222 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -221,7 +223,7 @@ public ListResultSchema getListResultSchema() {
     return listResultSchema;
   }
 
-  public final InputStream getListResultStream() {
+  public InputStream getListResultStream() {
     return listResultStream;
   }
 
@@ -396,8 +398,7 @@ final void parseResponse(final byte[] buffer,
       // consume the input stream to release resources
       int totalBytesRead = 0;
 
-      try {
-        InputStream stream = getContentInputStream();
+      try (InputStream stream = getContentInputStream()) {
         if (isNullInputStream(stream)) {
           return;
         }
@@ -409,7 +410,7 @@ final void parseResponse(final byte[] buffer,
           if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) {
             parseBlockListResponse(stream);
           } else {
-            listResultStream = stream;
+            parseListPathResponse(stream);
           }
         } else {
           if (buffer != null) {
@@ -438,6 +439,11 @@ final void parseResponse(final byte[] buffer,
             method, getMaskedUrl(), ex.getMessage());
         log.debug("IO Error: ", ex);
         throw ex;
+      } catch (Exception ex) {
+        log.warn("Unexpected error: {} {}: {}",
+            method, getMaskedUrl(), ex.getMessage());
+        log.debug("Unexpected Error: ", ex);
+        throw new IOException(ex);
       } finally {
         this.recvResponseTimeMs += elapsedTimeMs(startTime);
         this.bytesReceived = totalBytesRead;
@@ -500,6 +506,25 @@ private void parseBlockListResponse(final InputStream 
stream) throws IOException
     blockIdList = client.parseBlockListResponse(stream);
   }
 
+  /**
+   * Parse the list path response from the network stream and save response 
into a buffer.
+   * @param stream Network InputStream.
+   * @throws IOException if an error occurs while reading the stream.
+   */
+  private void parseListPathResponse(final InputStream stream) throws 
IOException {
+    if (stream == null || listResultStream != null) {
+      return;
+    }
+    try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
+      byte[] tempBuffer = new byte[CLEAN_UP_BUFFER_SIZE];
+      int bytesRead;
+      while ((bytesRead = stream.read(tempBuffer, 0, CLEAN_UP_BUFFER_SIZE)) != 
-1) {
+        buffer.write(tempBuffer, 0, bytesRead);
+      }
+      listResultStream = new ByteArrayInputStream(buffer.toByteArray());
+    }
+  }
+
   public List<String> getBlockIdList() {
     return blockIdList;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
index 7f7d96bd477..577b44cdb65 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
@@ -20,6 +20,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
@@ -63,7 +65,10 @@
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@@ -130,7 +135,9 @@ public Void call() throws Exception {
 
   /**
    * Test to verify that each paginated call to ListBlobs uses a new tracing 
context.
-   * @throws Exception
+   * Test also verifies that the retry policy is called when a 
SocketTimeoutException
+   * Test also verifies that empty list with valid continuation token is 
handled.
+   * @throws Exception if there is an error or test assertions fails.
    */
   @Test
   public void testListPathTracingContext() throws Exception {
@@ -160,6 +167,10 @@ public void testListPathTracingContext() throws Exception {
     List<FileStatus> fileStatuses = new ArrayList<>();
     spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, 
spiedTracingContext);
 
+    // Assert that there were retries due to SocketTimeoutException
+    Mockito.verify(spiedClient, Mockito.times(1))
+        .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+
     // Assert that there were 2 paginated ListPath calls were made 1 and 2.
     // 1. Without continuation token
     Mockito.verify(spiedClient, times(1)).listPath(
@@ -176,6 +187,31 @@ public void testListPathTracingContext() throws Exception {
     Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), 
any(), any());
   }
 
+  @Test
+  public void testListPathParsingFailure() throws Exception {
+    assumeBlobServiceType();
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler()
+        .getBlobClient());
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+
+    Mockito.doThrow(new 
SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(),
 any());
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    AbfsDriverException ex = intercept(AbfsDriverException.class,
+      () -> {
+        spiedStore.listStatus(new Path("/"), "", fileStatuses,
+            true, null, getTestTracingContext(spiedFs, true));
+      });
+    Assertions.assertThat(ex.getStatusCode())
+        .describedAs("Expecting Network Error status code")
+        .isEqualTo(-1);
+    Assertions.assertThat(ex.getErrorMessage())
+        .describedAs("Expecting COPY_ABORTED error code")
+        .contains(ERR_BLOB_LIST_PARSING);
+  }
+
   /**
    * Creates a file, verifies that listStatus returns it,
    * even while the file is still open for writing.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
index 4c4a748b72e..9ff37332ad7 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
@@ -45,6 +45,7 @@
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
@@ -67,6 +68,7 @@ public ITestApacheClientConnectionPool() throws Exception {
   public void testKacIsClosed() throws Throwable {
     Configuration configuration = new Configuration(getRawConfiguration());
     configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name());
+    configuration.unset(FS_AZURE_METRIC_FORMAT);
     try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
         configuration)) {
       KeepAliveCache kac = 
fs.getAbfsStore().getClientHandler().getIngressClient()
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 2495e50f434..274230e4b38 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -57,6 +57,7 @@ public class TestAbfsClient {
     public void testTimerInitializationWithoutMetricCollection() throws 
Exception {
         final Configuration configuration = new Configuration();
         AbfsConfiguration abfsConfiguration = new 
AbfsConfiguration(configuration, ACCOUNT_NAME);
+        abfsConfiguration.unset(FS_AZURE_METRIC_FORMAT);
 
         AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new 
URI("abcd")));
         AbfsClientContext abfsClientContext = new 
AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to