georgew5656 commented on code in PR #15287:
URL: https://github.com/apache/druid/pull/15287#discussion_r1394331964


##########
extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java:
##########
@@ -127,159 +130,136 @@ public OutputStream getBlockBlobOutputStream(
       final String blobPath,
       @Nullable final Integer streamWriteSizeBytes,
       Integer maxAttempts
-  ) throws URISyntaxException, StorageException
+  ) throws BlobStorageException
   {
-    CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
-    CloudBlockBlob blockBlobReference = 
container.getBlockBlobReference(blobPath);
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName, maxAttempts);
+    BlockBlobClient blockBlobClient = 
blobContainerClient.getBlobClient(blobPath).getBlockBlobClient();
 
-    if (blockBlobReference.exists()) {
+    if (blockBlobClient.exists()) {
       throw new RE("Reference already exists");
     }
-
+    BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions();
     if (streamWriteSizeBytes != null) {
-      blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
+      options.setParallelTransferOptions(new 
ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue()));
     }
-
-    return blockBlobReference.openOutputStream(null, 
getRequestOptionsWithRetry(maxAttempts), null);
-
+    return blockBlobClient.getBlobOutputStream(options);
   }
 
-  public CloudBlob getBlockBlobReferenceWithAttributes(final String 
containerName, final String blobPath)
-      throws URISyntaxException, StorageException
+  // There's no need to download attributes with the new azure clients, they 
will get fetched as needed.
+  public BlockBlobClient getBlockBlobReferenceWithAttributes(final String 
containerName, final String blobPath)
+      throws BlobStorageException
   {
-    final CloudBlockBlob blobReference = 
getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
-    blobReference.downloadAttributes();
-    return blobReference;
+    return 
getOrCreateBlobContainerClient(containerName).getBlobClient(blobPath).getBlockBlobClient();
   }
 
   public long getBlockBlobLength(final String containerName, final String 
blobPath)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
-    return getBlockBlobReferenceWithAttributes(containerName, 
blobPath).getProperties().getLength();
+    return getBlockBlobReferenceWithAttributes(containerName, 
blobPath).getProperties().getBlobSize();
   }
 
   public InputStream getBlockBlobInputStream(final String containerName, final 
String blobPath)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
     return getBlockBlobInputStream(0L, containerName, blobPath);
   }
 
   public InputStream getBlockBlobInputStream(long offset, final String 
containerName, final String blobPath)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
     return getBlockBlobInputStream(offset, null, containerName, blobPath);
   }
 
   public InputStream getBlockBlobInputStream(long offset, Long length, final 
String containerName, final String blobPath)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
     return getBlockBlobInputStream(offset, length, containerName, blobPath, 
null);
   }
 
   public InputStream getBlockBlobInputStream(long offset, Long length, final 
String containerName, final String blobPath, Integer maxAttempts)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
-    CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
-    return container.getBlockBlobReference(blobPath)
-                    .openInputStream(offset, length, null, 
getRequestOptionsWithRetry(maxAttempts), null);
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName, maxAttempts);
+    return blobContainerClient.getBlobClient(blobPath).openInputStream(new 
BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
   }
 
   public void batchDeleteFiles(String containerName, Iterable<String> paths, 
Integer maxAttempts)
-      throws URISyntaxException, StorageException
+      throws BlobBatchStorageException
   {
-    CloudBlobContainer cloudBlobContainer = 
getOrCreateCloudBlobContainer(containerName);
-    BlobDeleteBatchOperation blobDeleteBatchOperation = new 
BlobDeleteBatchOperation();
-    for (String path : paths) {
-      CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
-      blobDeleteBatchOperation.addSubOperation(blobReference);
-    }
-    cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, 
getRequestOptionsWithRetry(maxAttempts), null);
+
+    BlobBatchClient blobBatchClient = new 
BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, 
maxAttempts)).buildClient();
+    blobBatchClient.deleteBlobs(Lists.newArrayList(paths), 
DeleteSnapshotsOptionType.ONLY);
   }
 
   public List<String> listDir(final String containerName, final String 
virtualDirPath)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
     return listDir(containerName, virtualDirPath, null);
   }
 
   public List<String> listDir(final String containerName, final String 
virtualDirPath, final Integer maxAttempts)
-      throws StorageException, URISyntaxException
+      throws BlobStorageException
   {
     List<String> files = new ArrayList<>();
-    CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName, maxAttempts);
 
-    for (ListBlobItem blobItem :
-        container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, 
getRequestOptionsWithRetry(maxAttempts), null)) {
-      CloudBlob cloudBlob = (CloudBlob) blobItem;
-      files.add(cloudBlob.getName());
-    }
+    PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(
+        new ListBlobsOptions().setPrefix(virtualDirPath),
+        Duration.ofMillis(DELTA_BACKOFF_MS)
+    );
+
+    blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob 
-> files.add(blob.getName())));
 
     return files;
   }
 
-  public boolean getBlockBlobExists(String container, String blobPath) throws 
URISyntaxException, StorageException
+  public boolean getBlockBlobExists(String container, String blobPath) throws 
BlobStorageException
   {
     return getBlockBlobExists(container, blobPath, null);
   }
 
 
   public boolean getBlockBlobExists(String container, String blobPath, Integer 
maxAttempts)
-      throws URISyntaxException, StorageException
+      throws BlobStorageException
   {
-    return 
getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
-                                                   .exists(null, 
getRequestOptionsWithRetry(maxAttempts), null);
-  }
-
-  /**
-   * If maxAttempts is provided, this method returns request options with 
retry built in.
-   * Retry backoff is exponential backoff, with maxAttempts set to the one 
provided
-   */
-  @Nullable
-  private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
-  {
-    if (maxAttempts == null) {
-      return null;
-    }
-    BlobRequestOptions requestOptions = new BlobRequestOptions();
-    requestOptions.setRetryPolicyFactory(new 
RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
-    return requestOptions;
+    return getOrCreateBlobContainerClient(container, 
maxAttempts).getBlobClient(blobPath).exists();
   }
 
   @VisibleForTesting
-  CloudBlobClient getCloudBlobClient()
+  BlobServiceClient getBlobServiceClient()
   {
-    return this.cloudBlobClient.get();
+    return this.blobServiceClient.get();
   }
 
   @VisibleForTesting
-  ResultSegment<ListBlobItem> listBlobsWithPrefixInContainerSegmented(
+  PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
       final String containerName,
       final String prefix,
-      ResultContinuation continuationToken,
       int maxResults
-  ) throws StorageException, URISyntaxException
+  ) throws BlobStorageException
   {
-    CloudBlobContainer cloudBlobContainer = 
cloudBlobClient.get().getContainerReference(containerName);
-    return cloudBlobContainer
-        .listBlobsSegmented(
-            prefix,
-            /* Use flat blob listing here so that we get only blob types and 
not directories.*/
-            USE_FLAT_BLOB_LISTING,
-            EnumSet
-                .noneOf(BlobListingDetails.class),
-            maxResults,
-            continuationToken,
-            null,
-            null
-        );
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName);
+    return blobContainerClient.listBlobs(
+        new 
ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults),
+        Duration.ofMillis(DELTA_BACKOFF_MS)
+    );
   }
 
-  private CloudBlobContainer getOrCreateCloudBlobContainer(final String 
containerName)
-      throws StorageException, URISyntaxException
+  private BlobContainerClient getOrCreateBlobContainerClient(final String 
containerName)
   {
-    CloudBlobContainer cloudBlobContainer = 
cloudBlobClient.get().getContainerReference(containerName);
-    cloudBlobContainer.createIfNotExists();
+    return 
blobServiceClient.get().createBlobContainerIfNotExists(containerName);

Review Comment:
   realizing we probably need to check the maxRetries field as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to