suneet-s commented on code in PR #15287: URL: https://github.com/apache/druid/pull/15287#discussion_r1439612932
########## extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java: ########## @@ -46,6 +46,12 @@ public class AzureAccountConfig @JsonProperty private String sharedAccessStorageToken; + @JsonProperty + private String managedIdentityClientId; Review Comment: Can we add validation to error out if the user sets this without setting `useAzureCredentialsChain = true` as called out in the docs ########## extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; + +import javax.annotation.Nonnull; +import java.time.Duration; + +/** + * Factory class for generating BlobServiceClient objects. + */ +public class AzureClientFactory +{ + + private final AzureAccountConfig config; + + public AzureClientFactory(AzureAccountConfig config) + { + this.config = config; + } + + public BlobServiceClient getBlobServiceClient() + { + return getAuthenticatedBlobServiceClientBuilder().buildClient(); + } + + /** + * Azure doesn't let us override retryConfigs on BlobServiceClient so we need a second instance. + * @param retryCount number of retries + * @return BlobServiceClient with a custom retryCount + */ + public BlobServiceClient getRetriableBlobServiceClient(@Nonnull Integer retryCount) Review Comment: Is there a reason for this factory to return both a retriable and non-retriable client? ########## extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java: ########## @@ -127,159 +130,137 @@ public OutputStream getBlockBlobOutputStream( final String blobPath, @Nullable final Integer streamWriteSizeBytes, Integer maxAttempts - ) throws URISyntaxException, StorageException + ) throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); - if (blockBlobReference.exists()) { + if (blockBlobClient.exists()) { throw new RE("Reference already exists"); } - + BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions(); if (streamWriteSizeBytes != null) { - blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes); + options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue())); } - - return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null); - + return blockBlobClient.getBlobOutputStream(options); } - public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + // There's no need to download attributes with the new azure clients, they will get fetched as needed. + public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) + throws BlobStorageException { - final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath); - blobReference.downloadAttributes(); - return blobReference; + return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); } public long getBlockBlobLength(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { - return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength(); + return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize(); } public InputStream getBlockBlobInputStream(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(0L, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, null, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, length, containerName, blobPath, null); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - return container.getBlockBlobReference(blobPath) - .openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); } public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobBatchStorageException { - CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName); - BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation(); - for (String path : paths) { - CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path); - blobDeleteBatchOperation.addSubOperation(blobReference); - } - cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null); - } - public List<String> listDir(final String containerName, final String virtualDirPath) - throws URISyntaxException, StorageException - { - return listDir(containerName, virtualDirPath, null); + BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient(); + blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY); } public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws StorageException, URISyntaxException + throws BlobStorageException { List<String> files = new ArrayList<>(); - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - for (ListBlobItem blobItem : - container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) { - CloudBlob cloudBlob = (CloudBlob) blobItem; - files.add(cloudBlob.getName()); - } + PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs( + new ListBlobsOptions().setPrefix(virtualDirPath), + Duration.ofMillis(DELTA_BACKOFF_MS) + ); + + blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName()))); return files; } - public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException + public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException { return getBlockBlobExists(container, blobPath, null); } public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobStorageException { - return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath) - .exists(null, getRequestOptionsWithRetry(maxAttempts), null); + return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists(); } - /** - * If maxAttempts is provided, this method returns request options with retry built in. - * Retry backoff is exponential backoff, with maxAttempts set to the one provided - */ - @Nullable - private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts) + @VisibleForTesting + BlobServiceClient getRetriableBlobServiceClient(@Nonnull Integer maxAttempts) Review Comment: ```suggestion BlobServiceClient getRetriableBlobServiceClient(intmaxAttempts) ``` ########## extensions-core/azure-extensions/pom.xml: ########## @@ -40,29 +40,30 @@ <version>${project.parent.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>com.microsoft.azure</groupId> - <artifactId>azure-storage</artifactId> - <version>8.6.0</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + <version>1.10.1</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob</artifactId> + <version>12.24.0</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob-batch</artifactId> + <version>12.20.0</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-common</artifactId> + <version>12.23.0</version> + </dependency> Review Comment: https://learn.microsoft.com/en-us/azure/developer/java/sdk/get-started-maven#use-the-azure-sdk-for-java-build-tool - have you looked into adding this to see if the azure libraries are added correctly? ########## extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java: ########## @@ -127,159 +130,137 @@ public OutputStream getBlockBlobOutputStream( final String blobPath, @Nullable final Integer streamWriteSizeBytes, Integer maxAttempts - ) throws URISyntaxException, StorageException + ) throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); - if (blockBlobReference.exists()) { + if (blockBlobClient.exists()) { throw new RE("Reference already exists"); } - + BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions(); if (streamWriteSizeBytes != null) { - blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes); + options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue())); } - - return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null); - + return blockBlobClient.getBlobOutputStream(options); } - public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + // There's no need to download attributes with the new azure clients, they will get fetched as needed. + public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) + throws BlobStorageException { - final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath); - blobReference.downloadAttributes(); - return blobReference; + return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); } public long getBlockBlobLength(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { - return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength(); + return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize(); } public InputStream getBlockBlobInputStream(final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(0L, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, null, containerName, blobPath); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath) - throws URISyntaxException, StorageException + throws BlobStorageException { return getBlockBlobInputStream(offset, length, containerName, blobPath, null); } public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobStorageException { - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); - return container.getBlockBlobReference(blobPath) - .openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); } public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobBatchStorageException { - CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName); - BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation(); - for (String path : paths) { - CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path); - blobDeleteBatchOperation.addSubOperation(blobReference); - } - cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null); - } - public List<String> listDir(final String containerName, final String virtualDirPath) - throws URISyntaxException, StorageException - { - return listDir(containerName, virtualDirPath, null); + BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient(); + blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY); } public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws StorageException, URISyntaxException + throws BlobStorageException { List<String> files = new ArrayList<>(); - CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName); + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - for (ListBlobItem blobItem : - container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) { - CloudBlob cloudBlob = (CloudBlob) blobItem; - files.add(cloudBlob.getName()); - } + PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs( + new ListBlobsOptions().setPrefix(virtualDirPath), + Duration.ofMillis(DELTA_BACKOFF_MS) + ); + + blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName()))); return files; } - public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException + public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException { return getBlockBlobExists(container, blobPath, null); } public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts) - throws URISyntaxException, StorageException + throws BlobStorageException { - return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath) - .exists(null, getRequestOptionsWithRetry(maxAttempts), null); + return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists(); } - /** - * If maxAttempts is provided, this method returns request options with retry built in. - * Retry backoff is exponential backoff, with maxAttempts set to the one provided - */ - @Nullable - private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts) + @VisibleForTesting + BlobServiceClient getRetriableBlobServiceClient(@Nonnull Integer maxAttempts) { - if (maxAttempts == null) { - return null; + // To avoid keeping a ton of clients in memory, if maxAttempts is specified use a client with at least that many retries configured. Review Comment: This seems a little confusing - why would we want max attempts to mean "at least retry attempts". Also, if this is the desired behavior, we can push this logic into the factory instead of having it in the storage class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
