This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ef0232290cf Fix AzureStorage.batchDeleteFiles (#15730)
ef0232290cf is described below
commit ef0232290cf718e6d9db6e3627d26c4c80dfb4bd
Author: George Shiqi Wu <[email protected]>
AuthorDate: Wed Jan 24 14:37:58 2024 -0500
Fix AzureStorage.batchDeleteFiles (#15730)
* Fix param
* Fix deleteBatchFiles
* Fix unit tests
* Add tests
---
.../druid/storage/azure/AzureClientFactory.java | 10 ++++++++
.../apache/druid/storage/azure/AzureStorage.java | 19 +++++++++-----
.../azure/output/AzureStorageConnector.java | 5 ++--
.../druid/storage/azure/AzureStorageTest.java | 29 ++++++++++++++++++++++
.../azure/output/AzureStorageConnectorTest.java | 27 ++++++++++++++++++++
5 files changed, 82 insertions(+), 8 deletions(-)
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
index 3625eaa813a..d72fcd8395f 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
@@ -22,8 +22,11 @@ package org.apache.druid.storage.azure;
import com.azure.core.http.policy.ExponentialBackoffOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.time.Duration;
@@ -64,6 +67,13 @@ public class AzureClientFactory
return cachedBlobServiceClients.get(retryCount);
}
+
+ // Mainly here to make testing easier.
+ public BlobBatchClient getBlobBatchClient(BlobContainerClient
blobContainerClient)
+ {
+ return new BlobBatchClientBuilder(blobContainerClient).buildClient();
+ }
+
private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index aa7bef718cf..b929d255c1c 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -22,8 +22,6 @@ package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.batch.BlobBatchClient;
-import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobRange;
@@ -36,7 +34,7 @@ import
com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
@@ -49,6 +47,7 @@ import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Abstracts the Azure storage layer. Makes direct calls to Azure file system.
@@ -173,9 +172,17 @@ public class AzureStorage
public void batchDeleteFiles(String containerName, Iterable<String> paths,
Integer maxAttempts)
throws BlobBatchStorageException
{
-
- BlobBatchClient blobBatchClient = new
BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName,
maxAttempts)).buildClient();
- blobBatchClient.deleteBlobs(Lists.newArrayList(paths),
DeleteSnapshotsOptionType.ONLY);
+ BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
+ List<String> blobUris = Streams.stream(paths).map(path ->
blobContainerClient.getBlobContainerUrl() + "/" +
path).collect(Collectors.toList());
+
+ // We have to call forEach on the response because this is the only way
azure batch will throw an exception on a operation failure.
+ azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs(
+ blobUris,
+ DeleteSnapshotsOptionType.INCLUDE
+ ).forEach(response ->
+ log.debug("Deleting blob with URL %s completed with status code %d%n",
+ response.getRequest().getUrl(), response.getStatusCode())
+ );
}
public List<String> listDir(final String containerName, final String
virtualDirPath, final Integer maxAttempts)
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
index cd93c80ba14..be7041d7a99 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
@@ -19,6 +19,7 @@
package org.apache.druid.storage.azure.output;
+import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
@@ -158,7 +159,7 @@ public class AzureStorageConnector extends
ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
- catch (BlobStorageException e) {
+ catch (BlobStorageException | BlobBatchStorageException e) {
throw new IOException(e);
}
}
@@ -173,7 +174,7 @@ public class AzureStorageConnector extends
ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
- catch (BlobStorageException e) {
+ catch (BlobStorageException | BlobBatchStorageException e) {
throw new IOException(e);
}
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
index 5b980915ea9..c31dedf2191 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -23,17 +23,22 @@ import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
+import java.util.List;
+
// Using Mockito for the whole test class since azure classes (e.g.
BlobContainerClient) are final and can't be mocked with EasyMock
public class AzureStorageTest
@@ -87,5 +92,29 @@ public class AzureStorageTest
Assert.assertEquals(ImmutableList.of(BLOB_NAME),
azureStorage.listDir(CONTAINER, "", null));
}
+
+ @Test
+ public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
+ {
+ String containerUrl =
"https://implysaasdeveastussa.blob.core.windows.net/container";
+ BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
+
+ SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
+ supplier.set(new TestPagedResponse<>(ImmutableList.of()));
+ PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+
+ ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+
+
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
+ Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
+ captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
+ );
+
+ azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME),
null);
+ Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" +
BLOB_NAME);
+ }
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
index 1bdbe0a2ece..5201a3bdd9d 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.storage.azure.output;
+import com.azure.core.http.HttpResponse;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -198,4 +199,30 @@ public class AzureStorageConnectorTest
Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" +
TEST_FILE), ret);
EasyMock.reset(azureStorage);
}
+
+ @Test
+ public void test_deleteFile_blobStorageException()
+ {
+ EasyMock.reset(azureStorage);
+ HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
+ azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(),
EasyMock.anyInt());
+ EasyMock.expectLastCall().andThrow(new BlobStorageException("error",
mockHttpResponse, null));
+ EasyMock.replay(azureStorage);
+ Assert.assertThrows(IOException.class, () ->
storageConnector.deleteFile("file"));
+ EasyMock.verify(azureStorage);
+ EasyMock.reset(azureStorage);
+ }
+
+ @Test
+ public void test_deleteFiles_blobStorageException()
+ {
+ EasyMock.reset(azureStorage);
+ HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
+ azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(),
EasyMock.anyInt());
+ EasyMock.expectLastCall().andThrow(new BlobStorageException("error",
mockHttpResponse, null));
+ EasyMock.replay(azureStorage);
+ Assert.assertThrows(IOException.class, () ->
storageConnector.deleteFiles(ImmutableList.of()));
+ EasyMock.verify(azureStorage);
+ EasyMock.reset(azureStorage);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]