This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e6a82e8a11f Only create container in `AzureStorage` for write
operations (#16558)
e6a82e8a11f is described below
commit e6a82e8a11f23bf5808ac5a6a7216c9c6a437c92
Author: Andreas Maechler <[email protected]>
AuthorDate: Fri Jun 7 10:47:51 2024 -0600
Only create container in `AzureStorage` for write operations (#16558)
* Remove unused constants
* Refactor getBlockBlobLength
* Better link
* Upper-case log
* Mark defaultStorageAccount nullable
This is the case if you do not use Azure for deep-storage but ingest from
Azure blobs.
* Do not always create a new container if it doesn't exist
Specifically, only create a container if uploading a blob or writing a blob
stream
* Add lots of comments, group methods
* Revert "Mark defaultStorageAccount nullable"
* Add mockito for junit
* Add extra test
* Add comment
Thanks George.
* Pass blockSize as Long
* Test more branches...
---
extensions-core/azure-extensions/pom.xml | 6 +
.../druid/data/input/azure/AzureInputSource.java | 8 +-
.../azure/AzureStorageAccountInputSource.java | 12 +-
.../storage/azure/AzureDataSegmentKiller.java | 1 -
.../apache/druid/storage/azure/AzureStorage.java | 441 ++++++++++++++-------
.../org/apache/druid/storage/azure/AzureUtils.java | 2 -
.../azure/output/AzureStorageConnector.java | 4 +-
.../druid/storage/azure/AzureStorageTest.java | 136 ++++++-
.../azure/output/AzureStorageConnectorTest.java | 2 +-
9 files changed, 421 insertions(+), 191 deletions(-)
diff --git a/extensions-core/azure-extensions/pom.xml
b/extensions-core/azure-extensions/pom.xml
index 2955d88c406..88a1198b61e 100644
--- a/extensions-core/azure-extensions/pom.xml
+++ b/extensions-core/azure-extensions/pom.xml
@@ -190,6 +190,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index 339ec18ec3a..07a4c23370e 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure;
import com.azure.storage.blob.models.BlobStorageException;
-import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -161,12 +160,7 @@ public class AzureInputSource extends
CloudObjectInputSource
public long getObjectSize(CloudObjectLocation location)
{
try {
- final BlockBlobClient blobWithAttributes =
storage.getBlockBlobReferenceWithAttributes(
- location.getBucket(),
- location.getPath()
- );
-
- return blobWithAttributes.getProperties().getBlobSize();
+ return storage.getBlockBlobLength(location.getBucket(),
location.getPath());
}
catch (BlobStorageException e) {
throw new RuntimeException(e);
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
index f4f59325306..f84f0474bfc 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
@@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure;
import com.azure.storage.blob.models.BlobStorageException;
-import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -180,12 +179,7 @@ public class AzureStorageAccountInputSource extends
CloudObjectInputSource
try {
AzureStorage azureStorage = new
AzureStorage(azureIngestClientFactory, location.getBucket());
Pair<String, String> locationInfo =
getContainerAndPathFromObjectLocation(location);
- final BlockBlobClient blobWithAttributes =
azureStorage.getBlockBlobReferenceWithAttributes(
- locationInfo.lhs,
- locationInfo.rhs
- );
-
- return blobWithAttributes.getProperties().getBlobSize();
+ return azureStorage.getBlockBlobLength(locationInfo.lhs,
locationInfo.rhs);
}
catch (BlobStorageException e) {
throw new RuntimeException(e);
@@ -246,7 +240,9 @@ public class AzureStorageAccountInputSource extends
CloudObjectInputSource
public static Pair<String, String>
getContainerAndPathFromObjectLocation(CloudObjectLocation location)
{
String[] pathParts = location.getPath().split("/", 2);
- // If there is no path specified, use a empty path as azure will throw a
exception that is more clear than a index error.
+
+ // If there is no path specified, use an empty path as Azure will throw an
exception
+ // that is more clear than an index error.
return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : "");
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
index dd609e2f357..ce8a6cdd388 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
@@ -43,7 +43,6 @@ import java.util.Map;
public class AzureDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
- private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; //
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig;
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index a7419181364..011812843f2 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -21,7 +21,6 @@ package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem;
@@ -34,7 +33,6 @@ import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import org.apache.druid.java.util.common.RE;
@@ -52,259 +50,402 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * Abstracts the Azure storage layer. Makes direct calls to Azure file system.
+ * Abstracts the Azure storage layer, wrapping the Azure Java SDK.
+ * <p>
+ * When using a service client ({@link
com.azure.storage.blob.BlobServiceClient}, methods that rely on a container to
+ * exist should use {@link
com.azure.storage.blob.BlobServiceClient#getBlobContainerClient}.
+ * <p>
+ * If a method relies on a container to be created if it doesn't exist, call
+ * {@link
com.azure.storage.blob.BlobServiceClient#createBlobContainerIfNotExists(String)}.
*/
public class AzureStorage
{
-
// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;
- //
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256;
-
- private static final Logger log = new Logger(AzureStorage.class);
+ private static final Logger LOG = new Logger(AzureStorage.class);
private final AzureClientFactory azureClientFactory;
private final String defaultStorageAccount;
public AzureStorage(
- AzureClientFactory azureClientFactory,
- @Nullable String defaultStorageAccount
+ final AzureClientFactory azureClientFactory,
+ final String defaultStorageAccount
)
{
this.azureClientFactory = azureClientFactory;
this.defaultStorageAccount = defaultStorageAccount;
}
- public List<String> emptyCloudBlobDirectory(final String containerName,
final String virtualDirPath)
+ /**
+ * See {@link AzureStorage#emptyCloudBlobDirectory(String, String, Integer)}
for details.
+ */
+ public List<String> emptyCloudBlobDirectory(final String containerName,
@Nullable final String prefix)
throws BlobStorageException
{
- return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
+ return emptyCloudBlobDirectory(containerName, prefix, null);
}
- public List<String> emptyCloudBlobDirectory(final String containerName,
final String virtualDirPath, final Integer maxAttempts)
- throws BlobStorageException
+ /**
+ * Delete all blobs under the given prefix.
+ *
+ * @param containerName The name of the storage container.
+ * @param prefix (Optional) The Azure storage prefix to delete blobs
for.
+ * If null, deletes all blobs in the storage container.
+ * @param maxAttempts (Optional) Number of attempts to retry in case an
API call fails.
+ * If null, defaults to the system default
(`druid.azure.maxTries`).
+ *
+ * @return The list of blobs deleted.
+ */
+ public List<String> emptyCloudBlobDirectory(
+ final String containerName,
+ @Nullable final String prefix,
+ @Nullable final Integer maxAttempts
+ ) throws BlobStorageException
{
- List<String> deletedFiles = new ArrayList<>();
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
+ final BlobContainerClient blobContainerClient = azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .getBlobContainerClient(containerName);
//
https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The
new client uses flat listing by default.
- PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(new
ListBlobsOptions().setPrefix(virtualDirPath),
Duration.ofMillis(DELTA_BACKOFF_MS));
+ final PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(
+ new ListBlobsOptions().setPrefix(prefix),
+ Duration.ofMillis(DELTA_BACKOFF_MS)
+ );
- blobItems.iterableByPage().forEach(page -> {
- page.getElements().forEach(blob -> {
- if
(blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) {
- deletedFiles.add(blob.getName());
- }
- });
- });
+ final List<String> deletedFiles = new ArrayList<>();
+ blobItems.iterableByPage().forEach(
+ page -> page.getElements().forEach(
+ blob -> {
+ if
(blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) {
+ deletedFiles.add(blob.getName());
+ }
+ }
+ )
+ );
if (deletedFiles.isEmpty()) {
- log.warn("No files were deleted on the following Azure path: [%s]",
virtualDirPath);
+ LOG.warn("No files were deleted on the following Azure path: [%s]",
prefix);
}
return deletedFiles;
}
- public void uploadBlockBlob(final File file, final String containerName,
final String blobPath, final Integer maxAttempts)
- throws IOException, BlobStorageException
- {
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
-
- try (FileInputStream stream = new FileInputStream(file)) {
- // By default this creates a Block blob, no need to use a specific Block
blob client.
- // We also need to urlEncode the path to handle special characters.
- // Set overwrite to true to keep behavior more similar to
s3Client.putObject
-
blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).upload(stream,
file.length(), true);
- }
- }
-
+ /**
+ * Creates and opens an output stream to write data to the block blob.
+ * <p>
+ * If the blob already exists, an exception will be thrown.
+ *
+ * @param containerName The name of the storage container.
+ * @param blobName The name of the blob within the container.
+ * @param blockSize (Optional) The block size to use when writing the
blob.
+ * If null, the default block size will be used.
+ * @param maxAttempts (Optional) The maximum number of attempts to retry
the upload in case of failure.
+ * If null, the default value from the system
configuration (`druid.azure.maxTries`) will be used.
+ *
+ * @return An OutputStream for writing the blob.
+ */
public OutputStream getBlockBlobOutputStream(
final String containerName,
- final String blobPath,
- @Nullable final Integer streamWriteSizeBytes,
- Integer maxAttempts
+ final String blobName,
+ @Nullable final Long blockSize,
+ @Nullable final Integer maxAttempts
) throws BlobStorageException
{
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
- BlockBlobClient blockBlobClient =
blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient();
+ final BlockBlobClient blockBlobClient = azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .createBlobContainerIfNotExists(containerName)
+ .getBlobClient(Utility.urlEncode(blobName))
+ .getBlockBlobClient();
+ // TODO based on the usage here, it might be better to overwrite the
existing blob instead; that's what StorageConnector#write documents it does
if (blockBlobClient.exists()) {
throw new RE("Reference already exists");
}
- BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions();
- if (streamWriteSizeBytes != null) {
- options.setParallelTransferOptions(new
ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue()));
+
+ final BlockBlobOutputStreamOptions options = new
BlockBlobOutputStreamOptions();
+ if (blockSize != null) {
+ options.setParallelTransferOptions(new
ParallelTransferOptions().setBlockSizeLong(blockSize));
}
- return blockBlobClient.getBlobOutputStream(options);
- }
- // There's no need to download attributes with the new azure clients, they
will get fetched as needed.
- public BlockBlobClient getBlockBlobReferenceWithAttributes(final String
containerName, final String blobPath)
- throws BlobStorageException
- {
- return
getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient();
+ return blockBlobClient.getBlobOutputStream(options);
}
- public long getBlockBlobLength(final String containerName, final String
blobPath)
- throws BlobStorageException
+ /**
+ * Gets the length of the specified block blob.
+ *
+ * @param containerName The name of the storage container.
+ * @param blobName The name of the blob within the container.
+ *
+ * @return The length of the blob in bytes.
+ */
+ public long getBlockBlobLength(final String containerName, final String
blobName) throws BlobStorageException
{
- return getBlockBlobReferenceWithAttributes(containerName,
blobPath).getProperties().getBlobSize();
+ return azureClientFactory
+ .getBlobServiceClient(null, defaultStorageAccount)
+ .getBlobContainerClient(containerName)
+ .getBlobClient(Utility.urlEncode(blobName))
+ .getBlockBlobClient()
+ .getProperties()
+ .getBlobSize();
}
- public InputStream getBlockBlobInputStream(final String containerName, final
String blobPath)
+ /**
+ * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String,
String, Integer)} for details.
+ */
+ public InputStream getBlockBlobInputStream(final String containerName, final
String blobName)
throws BlobStorageException
{
- return getBlockBlobInputStream(0L, containerName, blobPath);
+ return getBlockBlobInputStream(0L, containerName, blobName);
}
- public InputStream getBlockBlobInputStream(long offset, final String
containerName, final String blobPath)
+ /**
+ * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String,
String, Integer)} for details.
+ */
+ public InputStream getBlockBlobInputStream(final long offset, final String
containerName, final String blobName)
throws BlobStorageException
{
- return getBlockBlobInputStream(offset, null, containerName, blobPath);
+ return getBlockBlobInputStream(offset, null, containerName, blobName);
}
- public InputStream getBlockBlobInputStream(long offset, Long length, final
String containerName, final String blobPath)
- throws BlobStorageException
+ /**
+ * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String,
String, Integer)} for details.
+ */
+ public InputStream getBlockBlobInputStream(
+ final long offset,
+ @Nullable final Long length,
+ final String containerName,
+ final String blobName
+ ) throws BlobStorageException
{
- return getBlockBlobInputStream(offset, length, containerName, blobPath,
null);
+ return getBlockBlobInputStream(offset, length, containerName, blobName,
null);
}
- public InputStream getBlockBlobInputStream(long offset, Long length, final
String containerName, final String blobPath, Integer maxAttempts)
- throws BlobStorageException
+ /**
+ * Gets an InputStream for reading the contents of the specified block blob.
+ *
+ * @param containerName The name of the storage container.
+ * @param blobName The name of the blob within the container.
+ *
+ * @return An InputStream for reading the blob.
+ */
+ public InputStream getBlockBlobInputStream(
+ final long offset,
+ @Nullable final Long length,
+ final String containerName,
+ final String blobName,
+ @Nullable final Integer maxAttempts
+ ) throws BlobStorageException
{
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
- return
blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new
BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
+ return azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .getBlobContainerClient(containerName)
+ .getBlobClient(Utility.urlEncode(blobName))
+ .openInputStream(new BlobInputStreamOptions().setRange(new
BlobRange(offset, length)));
}
/**
* Deletes multiple files from the specified container.
*
- * @param containerName The name of the container from which files will be
deleted.
+ * @param containerName The name of the storage container.
* @param paths An iterable of file paths to be deleted.
- * @param maxAttempts (Optional) The maximum number of attempts to delete
each file.
- * If null, the system default number of attempts will
be used.
+ * @param maxAttempts (Optional) Number of attempts to retry in case an
API call fails.
+ * If null, defaults to the system default
(`druid.azure.maxTries`).
+ *
* @return true if all files were successfully deleted; false otherwise.
*/
- public boolean batchDeleteFiles(String containerName, Iterable<String>
paths, @Nullable Integer maxAttempts)
- throws BlobBatchStorageException
+ public boolean batchDeleteFiles(
+ final String containerName,
+ final Iterable<String> paths,
+ @Nullable final Integer maxAttempts
+ ) throws BlobBatchStorageException
{
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
+ BlobContainerClient blobContainerClient = azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .getBlobContainerClient(containerName);
+
BlobBatchClient blobBatchClient =
azureClientFactory.getBlobBatchClient(blobContainerClient);
- List<String> blobUris = Streams.stream(paths).map(path ->
blobContainerClient.getBlobContainerUrl() + "/" +
path).collect(Collectors.toList());
+ List<String> blobUris = Streams.stream(paths)
+ .map(path ->
blobContainerClient.getBlobContainerUrl() + "/" + path)
+ .collect(Collectors.toList());
+
boolean hadException = false;
- List<List<String>> keysChunks = Lists.partition(
- blobUris,
- MAX_MULTI_OBJECT_DELETE_SIZE
- );
+ List<List<String>> keysChunks = Lists.partition(blobUris,
MAX_MULTI_OBJECT_DELETE_SIZE);
for (List<String> chunkOfKeys : keysChunks) {
try {
- log.info(
- "Removing from container [%s] the following files: [%s]",
- containerName,
- chunkOfKeys
- );
- // We have to call forEach on the response because this is the only
way azure batch will throw an exception on a operation failure.
- blobBatchClient.deleteBlobs(
- chunkOfKeys,
- DeleteSnapshotsOptionType.INCLUDE
- ).forEach(response ->
- log.debug("Deleting blob with URL %s completed with status
code %d%n",
- response.getRequest().getUrl(),
response.getStatusCode())
- );
+ LOG.info("Removing from container [%s] the following files: [%s]",
containerName, chunkOfKeys);
+
+ // We have to call forEach on the response because this is the only
way azure batch will throw an exception on an operation failure.
+ blobBatchClient.deleteBlobs(chunkOfKeys,
DeleteSnapshotsOptionType.INCLUDE).forEach(response -> LOG.debug(
+ "Deleting blob with URL %s completed with status code %d%n",
+ response.getRequest().getUrl(),
+ response.getStatusCode()
+ ));
}
catch (BlobStorageException | BlobBatchStorageException e) {
hadException = true;
- log.noStackTrace().warn(e,
- "Unable to delete from container [%s], the following keys
[%s]",
- containerName,
- chunkOfKeys
+ LOG.noStackTrace().warn(
+ e,
+ "Unable to delete from container [%s], the following keys [%s]",
+ containerName,
+ chunkOfKeys
);
}
catch (Exception e) {
hadException = true;
- log.noStackTrace().warn(e,
- "Unexpected exception occurred when deleting from container
[%s], the following keys [%s]",
- containerName,
- chunkOfKeys
+ LOG.noStackTrace().warn(
+ e,
+ "Unexpected exception occurred when deleting from container [%s],
the following keys [%s]",
+ containerName,
+ chunkOfKeys
);
}
}
+
return !hadException;
}
- public List<String> listDir(final String containerName, final String
virtualDirPath, final Integer maxAttempts)
- throws BlobStorageException
+ /**
+ * See {@link AzureStorage#getBlockBlobExists(String, String, Integer)} for
details.
+ */
+ public boolean getBlockBlobExists(final String container, final String
blobName) throws BlobStorageException
{
- List<String> files = new ArrayList<>();
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
-
- PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(
- new ListBlobsOptions().setPrefix(virtualDirPath),
- Duration.ofMillis(DELTA_BACKOFF_MS)
- );
-
- blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob
-> files.add(blob.getName())));
-
- return files;
+ return getBlockBlobExists(container, blobName, null);
}
- public boolean getBlockBlobExists(String container, String blobPath) throws
BlobStorageException
+ /**
+ * Checks if the specified block blob exists in the given Azure Blob Storage
container.
+ *
+ * @param container The name of the Azure Blob Storage container.
+ * @param blobName The name of the blob within the container.
+ * @param maxAttempts (Optional) The maximum number of attempts to retry the
existence check in case of failure.
+ * If null, the default value from the system
configuration (`druid.azure.maxTries`) will be used.
+ * @return `true` if the block blob exists, `false` otherwise.
+ * @throws BlobStorageException If there is an error checking the existence
of the blob.
+ */
+ public boolean getBlockBlobExists(
+ final String container,
+ final String blobName,
+ @Nullable final Integer maxAttempts
+ ) throws BlobStorageException
{
- return getBlockBlobExists(container, blobPath, null);
+ return azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .getBlobContainerClient(container)
+ .getBlobClient(Utility.urlEncode(blobName))
+ .exists();
}
-
- public boolean getBlockBlobExists(String container, String blobPath, Integer
maxAttempts)
- throws BlobStorageException
+ /**
+ * Lists the blobs in the specified storage container, optionally matching a
given prefix.
+ *
+ * @param storageAccount The name of the storage account.
+ * @param containerName The name of the storage container.
+ * @param prefix (Optional) The Azure storage prefix to list blobs
for.
+ * If null, lists all blobs in the storage container.
+ * @param maxResults (Optional) The maximum number of results to return
per page.
+ * If null, defaults to the API default (5000).
+ * @param maxAttempts (Optional) The maximum number of attempts to retry
the list operation in case of failure.
+ * If null, the default value from the system
configuration (`druid.azure.maxTries`) will be used.
+ *
+ * @return Returns a lazy loaded list of blobs in this container.
+ */
+ public PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
+ final String storageAccount,
+ final String containerName,
+ @Nullable final String prefix,
+ @Nullable final Integer maxResults,
+ @Nullable final Integer maxAttempts
+ ) throws BlobStorageException
{
- return getOrCreateBlobContainerClient(container,
maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists();
- }
+ final ListBlobsOptions listOptions = new ListBlobsOptions();
- @VisibleForTesting
- BlobServiceClient getBlobServiceClient(Integer maxAttempts)
- {
- return azureClientFactory.getBlobServiceClient(maxAttempts,
defaultStorageAccount);
- }
+ if (maxResults != null) {
+ listOptions.setMaxResultsPerPage(maxResults);
+ }
- @VisibleForTesting
- BlobServiceClient getBlobServiceClient(String storageAccount, Integer
maxAttempts)
- {
- return azureClientFactory.getBlobServiceClient(maxAttempts,
storageAccount);
+ if (prefix != null) {
+ listOptions.setPrefix(prefix);
+ }
+
+ return azureClientFactory
+ .getBlobServiceClient(maxAttempts, storageAccount)
+ .getBlobContainerClient(containerName)
+ .listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS));
}
- // This method is used in AzureCloudBlobIterator in a method where one
azureStorage instance might need to list from multiple
- // storage accounts, so storageAccount is a valid parameter.
- @VisibleForTesting
- PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
- final String storageAccount,
+ /**
+ * Lists the blobs in the specified storage container, optionally matching a
given prefix.
+ *
+ * @param containerName The name of the storage container.
+ * @param prefix (Optional) The Azure storage prefix to list blobs
for.
+ * If null, lists all blobs in the storage container.
+ * @param maxResults (Optional) The maximum number of results to return
per page.
+ * If null, defaults to the API default (5000).
+ * @param maxAttempts (Optional) The maximum number of attempts to retry
the list operation in case of failure.
+ * If null, the default value from the system
configuration (`druid.azure.maxTries`) will be used.
+ *
+ * @return A list of blob names in this container.
+ */
+ public List<String> listBlobs(
final String containerName,
- final String prefix,
- int maxResults,
- Integer maxAttempts
+ @Nullable final String prefix,
+ @Nullable final Integer maxResults,
+ @Nullable final Integer maxAttempts
) throws BlobStorageException
{
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(storageAccount, containerName, maxAttempts);
- return blobContainerClient.listBlobs(
- new
ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults),
- Duration.ofMillis(DELTA_BACKOFF_MS)
- );
- }
+ final ListBlobsOptions listOptions = new
ListBlobsOptions().setPrefix(prefix);
- private BlobContainerClient getOrCreateBlobContainerClient(final String
containerName)
- {
- return
getBlobServiceClient(null).createBlobContainerIfNotExists(containerName);
- }
+ if (maxResults != null) {
+ listOptions.setMaxResultsPerPage(maxResults);
+ }
- private BlobContainerClient getOrCreateBlobContainerClient(final String
containerName, final Integer maxRetries)
- {
- return
getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName);
+ if (prefix != null) {
+ listOptions.setPrefix(prefix);
+ }
+
+ final PagedIterable<BlobItem> blobItems = azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .getBlobContainerClient(containerName)
+ .listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS));
+
+ final List<String> files = new ArrayList<>();
+ blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob
-> files.add(blob.getName())));
+
+ return files;
}
- private BlobContainerClient getOrCreateBlobContainerClient(final String
storageAccount, final String containerName, final Integer maxRetries)
+ /**
+ * Creates a new blob, or updates the content of an existing blob.
+ *
+ * @param file The file to write to the blob.
+ * @param containerName The name of the storage container.
+ * @param blobName The blob name to write the file to.
+ * @param maxAttempts (Optional) Number of attempts to retry in case an
API call fails.
+ * If null, defaults to the system default
(`druid.azure.maxTries`).
+ */
+ public void uploadBlockBlob(
+ final File file,
+ final String containerName,
+ final String blobName,
+ @Nullable final Integer maxAttempts
+ ) throws IOException, BlobStorageException
{
- return getBlobServiceClient(storageAccount,
maxRetries).createBlobContainerIfNotExists(containerName);
+ final BlobContainerClient blobContainerClient = azureClientFactory
+ .getBlobServiceClient(maxAttempts, defaultStorageAccount)
+ .createBlobContainerIfNotExists(containerName);
+
+ try (FileInputStream stream = new FileInputStream(file)) {
+ blobContainerClient
+ // Creates a blob by default, no need to use a specific blob client.
+ // We also need to urlEncode the path to handle special characters.
+ .getBlobClient(Utility.urlEncode(blobName))
+
+ // Set overwrite to true to keep behavior more similar to
s3Client.putObject.
+ .upload(stream, file.length(), true);
+ }
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
index 2f6d07d542e..2760647073a 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
@@ -37,8 +37,6 @@ import java.util.concurrent.TimeoutException;
*/
public class AzureUtils
{
-
- public static final String DEFAULT_AZURE_ENDPOINT_SUFFIX =
"core.windows.net";
@VisibleForTesting
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
index be7041d7a99..2d6276c5a6d 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
@@ -140,7 +140,7 @@ public class AzureStorageConnector extends
ChunkingStorageConnector<AzureInputRa
return azureStorage.getBlockBlobOutputStream(
config.getContainer(),
objectPath(path),
- config.getChunkSize().getBytesInInt(),
+ config.getChunkSize().getBytes(),
config.getMaxRetry()
);
}
@@ -196,7 +196,7 @@ public class AzureStorageConnector extends
ChunkingStorageConnector<AzureInputRa
final String prefixBasePath = objectPath(dirName);
List<String> paths;
try {
- paths = azureStorage.listDir(config.getContainer(), prefixBasePath,
config.getMaxRetry());
+ paths = azureStorage.listBlobs(config.getContainer(), prefixBasePath,
null, config.getMaxRetry());
}
catch (BlobStorageException e) {
throw new IOException(e);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
index b61e7234014..3d76719732a 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -29,15 +29,25 @@ import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.azure.storage.blob.specialized.BlockBlobClient;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
+import org.apache.druid.java.util.common.RE;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -46,32 +56,112 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-
-// Using Mockito for the whole test class since azure classes (e.g.
BlobContainerClient) are final and can't be mocked with EasyMock
+/**
+ * Using Mockito for the whole test class since Azure classes (e.g.
BlobContainerClient) are final
+ * and can't be mocked with EasyMock.
+ */
+@ExtendWith(MockitoExtension.class)
public class AzureStorageTest
{
- AzureStorage azureStorage;
- BlobClient blobClient = Mockito.mock(BlobClient.class);
- BlobServiceClient blobServiceClient = Mockito.mock(BlobServiceClient.class);
- BlobContainerClient blobContainerClient =
Mockito.mock(BlobContainerClient.class);
- AzureClientFactory azureClientFactory =
Mockito.mock(AzureClientFactory.class);
-
private final String STORAGE_ACCOUNT = "storageAccount";
private final String CONTAINER = "container";
private final String BLOB_NAME = "blobName";
+ private AzureStorage azureStorage;
+
+ @Mock
+ private AzureClientFactory azureClientFactory;
+ @Mock
+ private BlockBlobClient blockBlobClient;
+ @Mock
+ private BlobClient blobClient;
+ @Mock
+ private BlobContainerClient blobContainerClient;
+ @Mock
+ private BlobServiceClient blobServiceClient;
+
@BeforeEach
public void setup() throws BlobStorageException
{
azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT);
}
+ @ParameterizedTest
+ @ValueSource(longs = {-1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES_LONG +
1})
+ public void
testGetBlockBlockOutputStream_blockSizeOutOfBoundsException(final long
blockSize)
+ {
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
+
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
+ Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> azureStorage.getBlockBlobOutputStream(
+ CONTAINER,
+ BLOB_NAME,
+ blockSize,
+ null
+ )
+ );
+
+ assertEquals(
+ "The value of the parameter 'blockSize' should be between 1 and
4194304000.",
+ exception.getMessage()
+ );
+ }
+
@Test
- public void testListDir_retriable() throws BlobStorageException
+ public void testGetBlockBlockOutputStream_blobAlreadyExistsException()
+ {
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
+
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
+ Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
+ Mockito.doReturn(true).when(blockBlobClient).exists();
+
+ final RE exception = assertThrows(
+ RE.class,
+ () -> azureStorage.getBlockBlobOutputStream(
+ CONTAINER,
+ BLOB_NAME,
+ 100L,
+ null
+ )
+ );
+
+ assertEquals(
+ "Reference already exists",
+ exception.getMessage()
+ );
+ }
+
+ @ParameterizedTest
+ @NullSource
+ @ValueSource(longs = {1, 100})
+ public void testGetBlockBlockOutputStream_success(@Nullable final Long
blockSize)
+ {
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
+
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
+ Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
+
+ assertDoesNotThrow(() -> azureStorage.getBlockBlobOutputStream(
+ CONTAINER,
+ BLOB_NAME,
+ blockSize,
+ null
+ ));
+ }
+
+ @Test
+ public void testListBlobs_retriable() throws BlobStorageException
{
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new
BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
@@ -81,16 +171,19 @@ public class AzureStorageTest
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
final Integer maxAttempts = 3;
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(maxAttempts,
STORAGE_ACCOUNT);
- assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER,
"", maxAttempts));
+ assertEquals(
+ ImmutableList.of(BLOB_NAME),
+ azureStorage.listBlobs(CONTAINER, "", null, maxAttempts)
+ );
}
@Test
- public void testListDir_nullMaxAttempts() throws BlobStorageException
+ public void testListBlobs_nullMaxAttempts() throws BlobStorageException
{
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new
BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
@@ -100,10 +193,13 @@ public class AzureStorageTest
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
- assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER,
"", null));
+ assertEquals(
+ ImmutableList.of(BLOB_NAME),
+ azureStorage.listBlobs(CONTAINER, "", null, null)
+ );
}
@Test
@@ -118,7 +214,7 @@ public class AzureStorageTest
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3,
storageAccountCustom);
azureStorage.listBlobsWithPrefixInContainerSegmented(
@@ -143,7 +239,7 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
@@ -167,11 +263,11 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs(
- captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
+ captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);
boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER,
ImmutableList.of(BLOB_NAME), null);
@@ -192,11 +288,11 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
-
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
- captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
+ captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
index 5219f2b5962..ad9c627f036 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
@@ -195,7 +195,7 @@ public class AzureStorageConnectorTest
public void testListDir() throws BlobStorageException, IOException
{
EasyMock.reset(azureStorage);
- EasyMock.expect(azureStorage.listDir(EasyMock.anyString(),
EasyMock.anyString(), EasyMock.anyInt()))
+ EasyMock.expect(azureStorage.listBlobs(EasyMock.anyString(),
EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX
+ "/p/q/r/" + TEST_FILE));
EasyMock.replay(azureStorage);
List<String> ret = Lists.newArrayList(storageConnector.listDir(""));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]