This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ef0232290cf Fix AzureStorage.batchDeleteFiles (#15730)
ef0232290cf is described below

commit ef0232290cf718e6d9db6e3627d26c4c80dfb4bd
Author: George Shiqi Wu <[email protected]>
AuthorDate: Wed Jan 24 14:37:58 2024 -0500

    Fix AzureStorage.batchDeleteFiles (#15730)
    
    * Fix param
    
    * Fix deleteBatchFiles
    
    * Fix unit tests
    
    * Add tests
---
 .../druid/storage/azure/AzureClientFactory.java    | 10 ++++++++
 .../apache/druid/storage/azure/AzureStorage.java   | 19 +++++++++-----
 .../azure/output/AzureStorageConnector.java        |  5 ++--
 .../druid/storage/azure/AzureStorageTest.java      | 29 ++++++++++++++++++++++
 .../azure/output/AzureStorageConnectorTest.java    | 27 ++++++++++++++++++++
 5 files changed, 82 insertions(+), 8 deletions(-)

diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
index 3625eaa813a..d72fcd8395f 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
@@ -22,8 +22,11 @@ package org.apache.druid.storage.azure;
 import com.azure.core.http.policy.ExponentialBackoffOptions;
 import com.azure.core.http.policy.RetryOptions;
 import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
 
 import java.time.Duration;
@@ -64,6 +67,13 @@ public class AzureClientFactory
     return cachedBlobServiceClients.get(retryCount);
   }
 
+
+  // Mainly here to make testing easier.
+  public BlobBatchClient getBlobBatchClient(BlobContainerClient 
blobContainerClient)
+  {
+    return new BlobBatchClientBuilder(blobContainerClient).buildClient();
+  }
+
   private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
   {
     BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index aa7bef718cf..b929d255c1c 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -22,8 +22,6 @@ package org.apache.druid.storage.azure;
 import com.azure.core.http.rest.PagedIterable;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.batch.BlobBatchClient;
-import com.azure.storage.blob.batch.BlobBatchClientBuilder;
 import com.azure.storage.blob.batch.BlobBatchStorageException;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.BlobRange;
@@ -36,7 +34,7 @@ import 
com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
 import com.azure.storage.blob.specialized.BlockBlobClient;
 import com.azure.storage.common.Utility;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.logger.Logger;
 
@@ -49,6 +47,7 @@ import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Abstracts the Azure storage layer. Makes direct calls to Azure file system.
@@ -173,9 +172,17 @@ public class AzureStorage
   public void batchDeleteFiles(String containerName, Iterable<String> paths, 
Integer maxAttempts)
       throws BlobBatchStorageException
   {
-
-    BlobBatchClient blobBatchClient = new 
BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, 
maxAttempts)).buildClient();
-    blobBatchClient.deleteBlobs(Lists.newArrayList(paths), 
DeleteSnapshotsOptionType.ONLY);
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName, maxAttempts);
+    List<String> blobUris = Streams.stream(paths).map(path -> 
blobContainerClient.getBlobContainerUrl() + "/" + 
path).collect(Collectors.toList());
+
+    // We have to call forEach on the response because this is the only way 
azure batch will throw an exception on a operation failure.
+    azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs(
+        blobUris,
+        DeleteSnapshotsOptionType.INCLUDE
+    ).forEach(response ->
+        log.debug("Deleting blob with URL %s completed with status code %d%n",
+            response.getRequest().getUrl(), response.getStatusCode())
+    );
   }
 
   public List<String> listDir(final String containerName, final String 
virtualDirPath, final Integer maxAttempts)
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
index cd93c80ba14..be7041d7a99 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.storage.azure.output;
 
+import com.azure.storage.blob.batch.BlobBatchStorageException;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
@@ -158,7 +159,7 @@ public class AzureStorageConnector extends 
ChunkingStorageConnector<AzureInputRa
           config.getMaxRetry()
       );
     }
-    catch (BlobStorageException e) {
+    catch (BlobStorageException | BlobBatchStorageException e) {
       throw new IOException(e);
     }
   }
@@ -173,7 +174,7 @@ public class AzureStorageConnector extends 
ChunkingStorageConnector<AzureInputRa
           config.getMaxRetry()
       );
     }
-    catch (BlobStorageException e) {
+    catch (BlobStorageException | BlobBatchStorageException e) {
       throw new IOException(e);
     }
   }
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
index 5b980915ea9..c31dedf2191 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -23,17 +23,22 @@ import com.azure.core.http.rest.PagedIterable;
 import com.azure.core.http.rest.PagedResponse;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.batch.BlobBatchClient;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.BlobItemProperties;
 import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.guava.SettableSupplier;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
+import java.util.List;
+
 
 // Using Mockito for the whole test class since azure classes (e.g. 
BlobContainerClient) are final and can't be mocked with EasyMock
 public class AzureStorageTest
@@ -87,5 +92,29 @@ public class AzureStorageTest
 
     Assert.assertEquals(ImmutableList.of(BLOB_NAME), 
azureStorage.listDir(CONTAINER, "", null));
   }
+
+  @Test
+  public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
+  {
+    String containerUrl = 
"https://implysaasdeveastussa.blob.core.windows.net/container";;
+    BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
+
+    SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
+    supplier.set(new TestPagedResponse<>(ImmutableList.of()));
+    PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+
+    ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+
+    
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
+    
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+    
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
+    Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
+        captor.capture(), 
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
+    );
+
+    azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), 
null);
+    Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + 
BLOB_NAME);
+  }
 }
 
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
index 1bdbe0a2ece..5201a3bdd9d 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.storage.azure.output;
 
+import com.azure.core.http.HttpResponse;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -198,4 +199,30 @@ public class AzureStorageConnectorTest
     Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + 
TEST_FILE), ret);
     EasyMock.reset(azureStorage);
   }
+
+  @Test
+  public void test_deleteFile_blobStorageException()
+  {
+    EasyMock.reset(azureStorage);
+    HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
+    azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), 
EasyMock.anyInt());
+    EasyMock.expectLastCall().andThrow(new BlobStorageException("error", 
mockHttpResponse, null));
+    EasyMock.replay(azureStorage);
+    Assert.assertThrows(IOException.class, () -> 
storageConnector.deleteFile("file"));
+    EasyMock.verify(azureStorage);
+    EasyMock.reset(azureStorage);
+  }
+
+  @Test
+  public void test_deleteFiles_blobStorageException()
+  {
+    EasyMock.reset(azureStorage);
+    HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
+    azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), 
EasyMock.anyInt());
+    EasyMock.expectLastCall().andThrow(new BlobStorageException("error", 
mockHttpResponse, null));
+    EasyMock.replay(azureStorage);
+    Assert.assertThrows(IOException.class, () -> 
storageConnector.deleteFiles(ImmutableList.of()));
+    EasyMock.verify(azureStorage);
+    EasyMock.reset(azureStorage);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to