This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e512249e37 Azure multi read options (#15630)
3e512249e37 is described below

commit 3e512249e37407c0ece2b241d4b16e448eb18f0a
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Jan 25 13:29:16 2024 -0500

    Azure multi read options (#15630)
    
    * Include new dependencies
    
    * Mostly implemented
    
    * More azure fixes
    
    * Tests passing
    
    * Unit tests running
    
    * Test running after removing storage exception
    
    * Happy with coverage now
    
    * Add more tests
    
    * fix client factory
    
    * cleanup from testing
    
    * Remove old client
    
    * update docs
    
    * Exclude from spellcheck
    
    * Add licenses
    
    * Fix identity version
    
    * Save work
    
    * Add azure clients
    
    * add licenses
    
    * typos
    
    * Add dependencies
    
    * Exception is not thrown
    
    * Fix intellij check
    
    * Don't need to override
    
    * specify length
    
    * urldecode
    
    * encode path
    
    * Fix checks
    
    * Revert urlencode changes
    
    * Urlencode with azure library
    
    * Update docs/development/extensions-core/azure.md
    
    Co-authored-by: Abhishek Agarwal 
<[email protected]>
    
    * PR changes
    
    * Update docs/development/extensions-core/azure.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Add config for multiple storage accounts
    
    * Deprecate AzureTaskLogsConfig.maxRetries
    
    * Clean up azure retry block
    
    * logic update to reuse clients
    
    * fix comments
    
    * Create container conditionally
    
    * Fix key auth
    
    * save work
    
    * Fix unit tests
    
    * Revert old azure input type
    
    * Separate input source
    
    * save work
    
    * Add support for app registrations
    
    * Fix unit tests
    
    * clean up spacing
    
    * Add coverage
    
    * fixes from testing
    
    * cleanup some caching behavior
    
    * Add docs
    
    * Fix spelling issues
    
    * fix more spelling errors'
    
    * Fix intellij inspections
    
    * add simple changes from pr
    
    * save work on fixing bug
    
    * Fix unit tests
    
    * Add more testing
    
    * Fix unit test
    
    * Add tests
    
    * Add annotation for azureStorage
    
    * Fix up docs
    
    * Add comment for list method
    
    * Fix tests
    
    * Remove uneeded toString
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * Update docs/ingestion/input-sources.md
    
    Co-authored-by: 317brian <[email protected]>
    
    * PR changes
    
    * fix injection of StorageConnector
    
    * Fix checkstyle
    
    * clean up unit tests
    
    * More pr fixes
    
    ---------
    
    Co-authored-by: Abhishek Agarwal 
<[email protected]>
    Co-authored-by: 317brian <[email protected]>
---
 docs/ingestion/input-sources.md                    | 110 +++++++++++-
 .../apache/druid/data/input/azure/AzureEntity.java |  17 +-
 .../druid/data/input/azure/AzureEntityFactory.java |   7 +-
 .../druid/data/input/azure/AzureInputSource.java   |   7 +-
 ...ce.java => AzureStorageAccountInputSource.java} | 101 +++++++----
 .../AzureStorageAccountInputSourceConfig.java      | 138 +++++++++++++++
 .../druid/storage/azure/AzureByteSource.java       |   2 +-
 .../storage/azure/AzureByteSourceFactory.java      |   6 +-
 .../druid/storage/azure/AzureClientFactory.java    |  41 +++--
 .../storage/azure/AzureCloudBlobIterable.java      |   7 +-
 .../azure/AzureCloudBlobIterableFactory.java       |   2 +-
 .../storage/azure/AzureCloudBlobIterator.java      |  28 ++-
 .../azure/AzureCloudBlobIteratorFactory.java       |   2 +-
 .../storage/azure/AzureDataSegmentKiller.java      |   3 +-
 .../storage/azure/AzureDataSegmentPuller.java      |   6 +-
 .../storage/azure/AzureDataSegmentPusher.java      |   3 +-
 .../storage/azure/AzureIngestClientFactory.java    |  78 +++++++++
 .../apache/druid/storage/azure/AzureStorage.java   |  23 ++-
 .../storage/azure/AzureStorageDruidModule.java     |  15 +-
 .../apache/druid/storage/azure/AzureTaskLogs.java  |   3 +-
 .../org/apache/druid/storage/azure/AzureUtils.java |   2 +-
 .../druid/storage/azure/blob/CloudBlobHolder.java  |   9 +-
 .../output/AzureStorageConnectorProvider.java      |   2 +
 .../druid/data/input/azure/AzureEntityTest.java    |  70 ++++++--
 .../input/azure/AzureInputSourceSerdeTest.java     |   6 +-
 .../data/input/azure/AzureInputSourceTest.java     |  15 +-
 ...ava => AzureStorageAccountInputSourceTest.java} | 164 +++++++++++-------
 .../storage/azure/AzureClientFactoryTest.java      |  25 +--
 .../storage/azure/AzureCloudBlobIteratorTest.java  | 142 ++++++++++++++-
 .../storage/azure/AzureDataSegmentKillerTest.java  |   7 +-
 .../storage/azure/AzureDataSegmentPullerTest.java  |  16 +-
 .../azure/AzureIngestClientFactoryTest.java        | 190 +++++++++++++++++++++
 .../storage/azure/AzureStorageDruidModuleTest.java |  27 +--
 .../druid/storage/azure/AzureStorageTest.java      |  33 +++-
 .../druid/storage/azure/AzureTaskLogsTest.java     |  14 +-
 .../apache/druid/storage/azure/AzureTestUtils.java |   6 +-
 website/.spelling                                  |   3 +
 37 files changed, 1112 insertions(+), 218 deletions(-)

diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index 3e11734ee55..8e70ed8b0ef 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -309,6 +309,112 @@ Google Cloud Storage object:
 The Azure input source reads objects directly from Azure Blob store or Azure 
Data Lake sources. You can
 specify objects as a list of file URI strings or prefixes. You can split the 
Azure input source for use with [Parallel task](./native-batch.md) indexing and 
each worker task reads one chunk of the split data.
 
+
+:::info
+The  old `azure` schema is deprecated. Update your specs to use the 
`azureStorage` schema described below instead.
+:::
+
+Sample specs:
+
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "azureStorage",
+        "objectGlob": "**.json",
+        "uris": ["azureStorage://storageAccount/container/prefix1/file.json", 
"azureStorage://storageAccount/container/prefix2/file2.json"]
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      ...
+    },
+...
+```
+
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "azureStorage",
+        "objectGlob": "**.parquet",
+        "prefixes": ["azureStorage://storageAccount/container/prefix1/", 
"azureStorage://storageAccount/container/prefix2/"]
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      ...
+    },
+...
+```
+
+
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "azureStorage",
+        "objectGlob": "**.json",
+        "objects": [
+          { "bucket": "storageAccount", "path": 
"container/prefix1/file1.json"},
+          { "bucket": "storageAccount", "path": "container/prefix2/file2.json"}
+        ],
+        "properties": {
+          "sharedAccessStorageToken": "?sv=...<storage token secret>...",
+        }
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      ...
+    },
+...
+```
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|type|Set the value to `azureStorage`.|None|yes|
+|uris|JSON array of URIs where the Azure objects to be ingested are located. 
Use this format: 
`azureStorage://STORAGE_ACCOUNT/CONTAINER/PATH_TO_FILE`|None|One of the 
following must be set:`uris`, `prefixes`, or `objects`.|
+|prefixes|JSON array of URI prefixes for the locations of Azure objects to 
ingest. Use this format`azureStorage://STORAGE_ACCOUNT/CONTAINER/PREFIX`. Empty 
objects starting with any of the given prefixes are skipped.|None|One of the 
following must be set:`uris`, `prefixes`, or `objects`.|
+|objects|JSON array of Azure objects to ingest.|None|One of the following must 
be set:`uris`, `prefixes`, or `objects`.|
+|objectGlob|A glob for the object part of the Azure URI. In the URI 
`azureStorage://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br 
/><br />The glob must match the entire object part, not just the filename. For 
example, the glob `*.json` does not match `azureStorage://foo/bar/file.json` 
because the object part is `bar/file.json`, and the`*` does not match the 
slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br 
/>For more information, refer to the [...]
+|systemFields|JSON array of system fields to return as part of input rows. 
Possible values: `__file_uri` (Azure blob URI starting with `azureStorage://`), 
`__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|
+|properties|Properties object for overriding the default Azure configuration. 
See below for more information.|None|No (defaults will be used if not given)
+
+Note that the Azure input source skips all empty objects only when `prefixes` 
is specified.
+
+The `objects` property can one of the following:
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|bucket|Name of the Azure Blob Storage or Azure Data Lake storage 
account|None|yes|
+|path|The container and path where data is located.|None|yes|
+
+
+The `properties` property can be one of the following:
+
+- `sharedAccessStorageToken`
+- `key` 
+- `appRegistrationClientId`, `appRegistrationClientSecret`, and `tenantId` 
+- empty
+
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|sharedAccessStorageToken|The plain text string of this Azure Blob Storage 
Shared Access Token|None|No|
+|key|The root key of Azure Blob Storage Account|None|no|
+|appRegistrationClientId|The client ID of the Azure App registration to 
authenticate as|None|No|
+|appRegistrationClientSecret|The client secret of the Azure App registration 
to authenticate as|None|Yes if `appRegistrationClientId` is provided|
+|tenantId|The tenant ID of the Azure App registration to authenticate 
as|None|Yes if `appRegistrationClientId` is provided|
+
+<details closed>
+  <summary>Show the deprecated 'azure' input source</summary>
+
+Note that the deprecated `azure` input source doesn't support specifying which 
storage account to ingest from. We recommend using the `azureStorage` instead.
+
 Sample specs:
 
 ```json
@@ -372,7 +478,7 @@ Sample specs:
 |uris|JSON array of URIs where the Azure objects to be ingested are located, 
in the form `azure://<container>/<path-to-file>`|None|`uris` or `prefixes` or 
`objects` must be set|
 |prefixes|JSON array of URI prefixes for the locations of Azure objects to 
ingest, in the form `azure://<container>/<prefix>`. Empty objects starting with 
one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` 
must be set|
 |objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or 
`objects` must be set|
-|objectGlob|A glob for the object part of the S3 URI. In the URI 
`s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br 
/>The glob must match the entire object part, not just the filename. For 
example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the 
object part is `bar/file.json`, and the`*` does not match the slash. To match 
all objects ending in `.json`, use `**.json` instead.<br /><br />For more 
information, refer to the documentation for [`F [...]
+|objectGlob|A glob for the object part of the Azure URI. In the URI 
`azure://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br 
/>The glob must match the entire object part, not just the filename. For 
example, the glob `*.json` does not match `azure://foo/bar/file.json`, because 
the object part is `bar/file.json`, and the`*` does not match the slash. To 
match all objects ending in `.json`, use `**.json` instead.<br /><br />For more 
information, refer to the documentatio [...]
 |systemFields|JSON array of system fields to return as part of input rows. 
Possible values: `__file_uri` (Azure blob URI starting with `azure://`), 
`__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|
 
 Note that the Azure input source skips all empty objects only when `prefixes` 
is specified.
@@ -384,6 +490,8 @@ The `objects` property is:
 |bucket|Name of the Azure Blob Storage or Azure Data Lake container|None|yes|
 |path|The path where data is located.|None|yes|
 
+</details>
+
 ## HDFS input source
 
 :::info
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
index fc04b7b710f..31439c91fbb 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
@@ -24,8 +24,10 @@ import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import org.apache.druid.data.input.RetryingInputEntity;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.storage.azure.AzureByteSource;
 import org.apache.druid.storage.azure.AzureByteSourceFactory;
+import org.apache.druid.storage.azure.AzureStorage;
 import org.apache.druid.storage.azure.AzureUtils;
 
 import javax.annotation.Nonnull;
@@ -40,21 +42,32 @@ public class AzureEntity extends RetryingInputEntity
 {
   private final CloudObjectLocation location;
   private final AzureByteSource byteSource;
+  private final String scheme;
 
   @AssistedInject
   AzureEntity(
       @Nonnull @Assisted CloudObjectLocation location,
+      @Nonnull @Assisted AzureStorage azureStorage,
+      @Nonnull @Assisted String scheme,
       @Nonnull AzureByteSourceFactory byteSourceFactory
+
   )
   {
     this.location = location;
-    this.byteSource = byteSourceFactory.create(location.getBucket(), 
location.getPath());
+    this.scheme = scheme;
+    // If scheme is azureStorage, containerName is the first prefix in the 
path, otherwise containerName is the bucket
+    if (AzureStorageAccountInputSource.SCHEME.equals(this.scheme)) {
+      Pair<String, String> locationInfo = 
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(location);
+      this.byteSource = byteSourceFactory.create(locationInfo.lhs, 
locationInfo.rhs, azureStorage);
+    } else {
+      this.byteSource = byteSourceFactory.create(location.getBucket(), 
location.getPath(), azureStorage);
+    }
   }
 
   @Override
   public URI getUri()
   {
-    return location.toUri(AzureInputSource.SCHEME);
+    return location.toUri(this.scheme);
   }
 
   @Override
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
index 86f64a1f986..b75fab78078 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
@@ -20,11 +20,16 @@
 package org.apache.druid.data.input.azure;
 
 import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.storage.azure.AzureStorage;
 
 /**
  * Factory for creating {@link AzureEntity} objects
  */
 public interface AzureEntityFactory
 {
-  AzureEntity create(CloudObjectLocation location);
+  AzureEntity create(
+      CloudObjectLocation location,
+      AzureStorage azureStorage,
+      String scheme
+  );
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index 8cb6ff14976..339ec18ec3a 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.data.input.impl.CloudObjectSplitWidget;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.data.input.impl.systemfield.SystemField;
 import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
 import org.apache.druid.storage.azure.AzureInputDataConfig;
 import org.apache.druid.storage.azure.AzureStorage;
@@ -63,7 +64,7 @@ public class AzureInputSource extends CloudObjectInputSource
 
   @JsonCreator
   public AzureInputSource(
-      @JacksonInject AzureStorage storage,
+      @JacksonInject @Global AzureStorage storage,
       @JacksonInject AzureEntityFactory entityFactory,
       @JacksonInject AzureCloudBlobIterableFactory 
azureCloudBlobIterableFactory,
       @JacksonInject AzureInputDataConfig inputDataConfig,
@@ -128,7 +129,7 @@ public class AzureInputSource extends CloudObjectInputSource
   @Override
   protected AzureEntity createEntity(CloudObjectLocation location)
   {
-    return entityFactory.create(location);
+    return entityFactory.create(location, storage, SCHEME);
   }
 
   @Override
@@ -140,7 +141,7 @@ public class AzureInputSource extends CloudObjectInputSource
       public Iterator<LocationWithSize> 
getDescriptorIteratorForPrefixes(List<URI> prefixes)
       {
         return Iterators.transform(
-            azureCloudBlobIterableFactory.create(getPrefixes(), 
inputDataConfig.getMaxListingLength()).iterator(),
+            azureCloudBlobIterableFactory.create(getPrefixes(), 
inputDataConfig.getMaxListingLength(), storage).iterator(),
             blob -> {
               try {
                 return new LocationWithSize(
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
similarity index 59%
copy from 
extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
copy to 
extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
index 8cb6ff14976..f4f59325306 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
@@ -24,6 +24,7 @@ import com.azure.storage.blob.specialized.BlockBlobClient;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
@@ -35,7 +36,10 @@ import 
org.apache.druid.data.input.impl.CloudObjectSplitWidget;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.data.input.impl.systemfield.SystemField;
 import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.storage.azure.AzureAccountConfig;
 import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
+import org.apache.druid.storage.azure.AzureIngestClientFactory;
 import org.apache.druid.storage.azure.AzureInputDataConfig;
 import org.apache.druid.storage.azure.AzureStorage;
 
@@ -52,36 +56,45 @@ import java.util.Set;
  * Abstracts the Azure storage system where input data is stored. Allows users 
to retrieve entities in
  * the storage system that match either a particular uri, prefix, or object.
  */
-public class AzureInputSource extends CloudObjectInputSource
+public class AzureStorageAccountInputSource extends CloudObjectInputSource
 {
-  public static final String SCHEME = "azure";
+  public static final String SCHEME = "azureStorage";
 
-  private final AzureStorage storage;
   private final AzureEntityFactory entityFactory;
   private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
   private final AzureInputDataConfig inputDataConfig;
+  private final AzureStorageAccountInputSourceConfig 
azureStorageAccountInputSourceConfig;
+  private final AzureAccountConfig azureAccountConfig;
+
+  private final AzureIngestClientFactory azureIngestClientFactory;
 
   @JsonCreator
-  public AzureInputSource(
-      @JacksonInject AzureStorage storage,
+  public AzureStorageAccountInputSource(
       @JacksonInject AzureEntityFactory entityFactory,
       @JacksonInject AzureCloudBlobIterableFactory 
azureCloudBlobIterableFactory,
       @JacksonInject AzureInputDataConfig inputDataConfig,
+      @JacksonInject AzureAccountConfig azureAccountConfig,
       @JsonProperty("uris") @Nullable List<URI> uris,
       @JsonProperty("prefixes") @Nullable List<URI> prefixes,
       @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
       @JsonProperty("objectGlob") @Nullable String objectGlob,
+      @JsonProperty("properties") @Nullable 
AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig,
       @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields
   )
   {
     super(SCHEME, uris, prefixes, objects, objectGlob, systemFields);
-    this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
     this.entityFactory = Preconditions.checkNotNull(entityFactory, 
"AzureEntityFactory");
     this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
         azureCloudBlobIterableFactory,
         "AzureCloudBlobIterableFactory"
     );
     this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, 
"AzureInputDataConfig");
+    this.azureStorageAccountInputSourceConfig = 
azureStorageAccountInputSourceConfig;
+    this.azureAccountConfig = azureAccountConfig;
+    this.azureIngestClientFactory = new AzureIngestClientFactory(
+        azureAccountConfig,
+        azureStorageAccountInputSourceConfig
+    );
   }
 
   @JsonIgnore
@@ -95,15 +108,16 @@ public class AzureInputSource extends 
CloudObjectInputSource
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> 
withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
-    return new AzureInputSource(
-        storage,
+    return new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         null,
         null,
         split.get(),
         getObjectGlob(),
+        azureStorageAccountInputSourceConfig,
         systemFields
     );
   }
@@ -111,15 +125,15 @@ public class AzureInputSource extends 
CloudObjectInputSource
   @Override
   public Object getSystemFieldValue(InputEntity entity, SystemField field)
   {
-    final AzureEntity googleEntity = (AzureEntity) entity;
+    final AzureEntity azureEntity = (AzureEntity) entity;
 
     switch (field) {
       case URI:
-        return googleEntity.getUri().toString();
+        return azureEntity.getUri().toString();
       case BUCKET:
-        return googleEntity.getLocation().getBucket();
+        return azureEntity.getLocation().getBucket();
       case PATH:
-        return googleEntity.getLocation().getPath();
+        return azureEntity.getLocation().getPath();
       default:
         return null;
     }
@@ -128,7 +142,11 @@ public class AzureInputSource extends 
CloudObjectInputSource
   @Override
   protected AzureEntity createEntity(CloudObjectLocation location)
   {
-    return entityFactory.create(location);
+    return entityFactory.create(
+        location,
+        new AzureStorage(azureIngestClientFactory, location.getBucket()),
+        SCHEME
+    );
   }
 
   @Override
@@ -140,12 +158,12 @@ public class AzureInputSource extends 
CloudObjectInputSource
       public Iterator<LocationWithSize> 
getDescriptorIteratorForPrefixes(List<URI> prefixes)
       {
         return Iterators.transform(
-            azureCloudBlobIterableFactory.create(getPrefixes(), 
inputDataConfig.getMaxListingLength()).iterator(),
+            azureCloudBlobIterableFactory.create(prefixes, 
inputDataConfig.getMaxListingLength(), new 
AzureStorage(azureIngestClientFactory, null)).iterator(),
             blob -> {
               try {
                 return new LocationWithSize(
-                    blob.getContainerName(),
-                    blob.getName(),
+                    blob.getStorageAccount(),
+                    blob.getContainerName() + "/" + blob.getName(),
                     blob.getBlobLength()
                 );
               }
@@ -160,9 +178,11 @@ public class AzureInputSource extends 
CloudObjectInputSource
       public long getObjectSize(CloudObjectLocation location)
       {
         try {
-          final BlockBlobClient blobWithAttributes = 
storage.getBlockBlobReferenceWithAttributes(
-              location.getBucket(),
-              location.getPath()
+          AzureStorage azureStorage = new 
AzureStorage(azureIngestClientFactory, location.getBucket());
+          Pair<String, String> locationInfo = 
getContainerAndPathFromObjectLocation(location);
+          final BlockBlobClient blobWithAttributes = 
azureStorage.getBlockBlobReferenceWithAttributes(
+              locationInfo.lhs,
+              locationInfo.rhs
           );
 
           return blobWithAttributes.getProperties().getBlobSize();
@@ -176,6 +196,14 @@ public class AzureInputSource extends 
CloudObjectInputSource
     return new SplitWidget();
   }
 
+  @Nullable
+  @JsonProperty("properties")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public AzureStorageAccountInputSourceConfig 
getAzureStorageAccountInputSourceConfig()
+  {
+    return azureStorageAccountInputSourceConfig;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -188,28 +216,37 @@ public class AzureInputSource extends 
CloudObjectInputSource
     if (!super.equals(o)) {
       return false;
     }
-    AzureInputSource that = (AzureInputSource) o;
-    return storage.equals(that.storage) &&
-           entityFactory.equals(that.entityFactory) &&
-           
azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) &&
-           inputDataConfig.equals(that.inputDataConfig);
+    AzureStorageAccountInputSource that = (AzureStorageAccountInputSource) o;
+    return inputDataConfig.equals(that.inputDataConfig) &&
+        
azureStorageAccountInputSourceConfig.equals(that.azureStorageAccountInputSourceConfig)
 &&
+        azureAccountConfig.equals(that.azureAccountConfig) &&
+        azureIngestClientFactory.equals(that.azureIngestClientFactory);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), storage, entityFactory, 
azureCloudBlobIterableFactory, inputDataConfig);
+    return Objects.hash(super.hashCode(), inputDataConfig, 
azureStorageAccountInputSourceConfig, azureAccountConfig, 
azureIngestClientFactory);
   }
 
   @Override
   public String toString()
   {
-    return "AzureInputSource{" +
-           "uris=" + getUris() +
-           ", prefixes=" + getPrefixes() +
-           ", objects=" + getObjects() +
-           ", objectGlob=" + getObjectGlob() +
-           (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + 
systemFields) +
-           '}';
+    return "AzureStorageAccountInputSource{" +
+        "uris=" + getUris() +
+        ", prefixes=" + getPrefixes() +
+        ", objects=" + getObjects() +
+        ", objectGlob=" + getObjectGlob() +
+        ", azureStorageAccountInputSourceConfig=" + 
getAzureStorageAccountInputSourceConfig() +
+        (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + 
systemFields) +
+        '}';
+  }
+
+  // Returns a <containerName, path> pair given a location using the 
azureStorage schema.
+  public static Pair<String, String> 
getContainerAndPathFromObjectLocation(CloudObjectLocation location)
+  {
+    String[] pathParts = location.getPath().split("/", 2);
+    // If there is no path specified, use a empty path as azure will throw a 
exception that is more clear than a index error.
+    return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : "");
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
new file mode 100644
index 00000000000..4a2d8d0cb97
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.azure;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Contains properties for Azure input source.
+ * Properties can be specified by ingestionSpec which will override system 
default.
+ */
+public class AzureStorageAccountInputSourceConfig
+{
+  private final String sharedAccessStorageToken;
+  private final String key;
+  private final String appRegistrationClientId;
+  private final String appRegistrationClientSecret;
+  private final String tenantId;
+
+  @JsonCreator
+  public AzureStorageAccountInputSourceConfig(
+      @JsonProperty("sharedAccessStorageToken") @Nullable String 
sharedAccessStorageToken,
+      @JsonProperty("key") @Nullable String key,
+      @JsonProperty("appRegistrationClientId") @Nullable String 
appRegistrationClientId,
+      @JsonProperty("appRegistrationClientSecret") @Nullable String 
appRegistrationClientSecret,
+      @JsonProperty("tenantId") @Nullable String tenantId
+
+  )
+  {
+    this.sharedAccessStorageToken = sharedAccessStorageToken;
+    this.key = key;
+    this.appRegistrationClientId = appRegistrationClientId;
+    this.appRegistrationClientSecret = appRegistrationClientSecret;
+    this.tenantId = tenantId;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getSharedAccessStorageToken()
+  {
+    return sharedAccessStorageToken;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getAppRegistrationClientId()
+  {
+    return appRegistrationClientId;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getAppRegistrationClientSecret()
+  {
+    return appRegistrationClientSecret;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getTenantId()
+  {
+    return tenantId;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getKey()
+  {
+    return key;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AzureInputSourceConfig{" +
+        "sharedAccessStorageToken=" + sharedAccessStorageToken +
+        ", key=" + key +
+        ", appRegistrationClientId=" + appRegistrationClientId +
+        ", appRegistrationClientSecret=" + appRegistrationClientSecret +
+        ", tenantId=" + tenantId +
+        '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AzureStorageAccountInputSourceConfig that = 
(AzureStorageAccountInputSourceConfig) o;
+    return Objects.equals(key, that.key)
+        && Objects.equals(sharedAccessStorageToken, 
that.sharedAccessStorageToken)
+        && Objects.equals(appRegistrationClientId, 
that.appRegistrationClientId)
+        && Objects.equals(appRegistrationClientSecret, 
that.appRegistrationClientSecret)
+        && Objects.equals(tenantId, that.tenantId);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        sharedAccessStorageToken,
+        key,
+        appRegistrationClientId,
+        appRegistrationClientSecret,
+        tenantId
+    );
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
index dfbf6cb3385..537c0b60d37 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
@@ -40,7 +40,7 @@ public class AzureByteSource extends ByteSource
 
   @AssistedInject
   public AzureByteSource(
-      AzureStorage azureStorage,
+      @Assisted("azureStorage") AzureStorage azureStorage,
       @Assisted("containerName") String containerName,
       @Assisted("blobPath") String blobPath
   )
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
index 249e096b4c6..be3e5297e54 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
@@ -26,5 +26,9 @@ import com.google.inject.assistedinject.Assisted;
  */
 public interface AzureByteSourceFactory
 {
-  AzureByteSource create(@Assisted("containerName") String containerName, 
@Assisted("blobPath") String blobPath);
+  AzureByteSource create(
+      @Assisted("containerName") String containerName,
+      @Assisted("blobPath") String blobPath,
+      @Assisted("azureStorage") AzureStorage azureStorage
+  );
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
index d72fcd8395f..3c8d8e27de2 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
@@ -28,7 +28,9 @@ import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.batch.BlobBatchClient;
 import com.azure.storage.blob.batch.BlobBatchClientBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.druid.java.util.common.Pair;
 
+import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
@@ -39,8 +41,8 @@ import java.util.Map;
 public class AzureClientFactory
 {
 
-  private final AzureAccountConfig config;
-  private final Map<Integer, BlobServiceClient> cachedBlobServiceClients;
+  protected final AzureAccountConfig config;
+  private final Map<Pair<String, Integer>, BlobServiceClient> 
cachedBlobServiceClients;
 
   public AzureClientFactory(AzureAccountConfig config)
   {
@@ -49,38 +51,26 @@ public class AzureClientFactory
   }
 
   // It's okay to store clients in a map here because all the configs for 
specifying azure retries are static, and there are only 2 of them.
-  // The 2 configs are AzureAccountConfig.maxTries and 
AzureOutputConfig.maxRetrr.
-  // We will only ever have at most 2 clients in cachedBlobServiceClients.
-  public BlobServiceClient getBlobServiceClient(Integer retryCount)
+  // The 2 configs are AzureAccountConfig.maxTries and 
AzureOutputConfig.maxRetry.
+  // We will only ever have at most 2 clients in cachedBlobServiceClients per 
storage account.
+  public BlobServiceClient getBlobServiceClient(@Nullable Integer retryCount, 
String storageAccount)
   {
-    if (!cachedBlobServiceClients.containsKey(retryCount)) {
-      BlobServiceClientBuilder clientBuilder = 
getAuthenticatedBlobServiceClientBuilder()
-          .retryOptions(new RetryOptions(
-              new ExponentialBackoffOptions()
-                  .setMaxRetries(retryCount != null ? retryCount : 
config.getMaxTries())
-                  .setBaseDelay(Duration.ofMillis(1000))
-                  .setMaxDelay(Duration.ofMillis(60000))
-          ));
-      cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient());
-    }
-
-    return cachedBlobServiceClients.get(retryCount);
+    return cachedBlobServiceClients.computeIfAbsent(Pair.of(storageAccount, 
retryCount != null ? retryCount : config.getMaxTries()), key -> 
buildNewClient(key.rhs, key.lhs));
   }
 
-
   // Mainly here to make testing easier.
   public BlobBatchClient getBlobBatchClient(BlobContainerClient 
blobContainerClient)
   {
     return new BlobBatchClientBuilder(blobContainerClient).buildClient();
   }
 
-  private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
+  protected BlobServiceClient buildNewClient(Integer retryCount, String 
storageAccount)
   {
     BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
-        .endpoint("https://"; + config.getAccount() + "." + 
config.getBlobStorageEndpoint());
+        .endpoint("https://"; + storageAccount + "." + 
config.getBlobStorageEndpoint());
 
     if (config.getKey() != null) {
-      clientBuilder.credential(new 
StorageSharedKeyCredential(config.getAccount(), config.getKey()));
+      clientBuilder.credential(new StorageSharedKeyCredential(storageAccount, 
config.getKey()));
     } else if (config.getSharedAccessStorageToken() != null) {
       clientBuilder.sasToken(config.getSharedAccessStorageToken());
     } else if (config.getUseAzureCredentialsChain()) {
@@ -89,6 +79,13 @@ public class AzureClientFactory
           .managedIdentityClientId(config.getManagedIdentityClientId());
       clientBuilder.credential(defaultAzureCredentialBuilder.build());
     }
-    return clientBuilder;
+    return clientBuilder
+        .retryOptions(new RetryOptions(
+            new ExponentialBackoffOptions()
+                .setMaxRetries(retryCount)
+                .setBaseDelay(Duration.ofMillis(1000))
+                .setMaxDelay(Duration.ofMillis(60000))
+        ))
+        .buildClient();
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
index 55c51e58bb8..1900a46c4d4 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
@@ -34,22 +34,25 @@ public class AzureCloudBlobIterable implements 
Iterable<CloudBlobHolder>
   private final Iterable<URI> prefixes;
   private final int maxListingLength;
   private final AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory;
+  private final AzureStorage azureStorage;
 
   @AssistedInject
   public AzureCloudBlobIterable(
       AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory,
       @Assisted final Iterable<URI> prefixes,
-      @Assisted final int maxListingLength
+      @Assisted final int maxListingLength,
+      @Assisted final AzureStorage azureStorage
   )
   {
     this.azureCloudBlobIteratorFactory = azureCloudBlobIteratorFactory;
     this.prefixes = prefixes;
     this.maxListingLength = maxListingLength;
+    this.azureStorage = azureStorage;
   }
 
   @Override
   public Iterator<CloudBlobHolder> iterator()
   {
-    return azureCloudBlobIteratorFactory.create(prefixes, maxListingLength);
+    return azureCloudBlobIteratorFactory.create(prefixes, maxListingLength, 
azureStorage);
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
index ee038e2c06a..43eac7c76bf 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
@@ -26,5 +26,5 @@ import java.net.URI;
  */
 public interface AzureCloudBlobIterableFactory
 {
-  AzureCloudBlobIterable create(Iterable<URI> prefixes, int maxListingLength);
+  AzureCloudBlobIterable create(Iterable<URI> prefixes, int maxListingLength, 
AzureStorage azureStorage);
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
index 95ca3c08ae8..d57238ccb3a 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
@@ -22,6 +22,9 @@ package org.apache.druid.storage.azure;
 import com.azure.storage.blob.models.BlobItem;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.storage.azure.blob.CloudBlobHolder;
@@ -37,9 +40,10 @@ import java.util.NoSuchElementException;
 public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
 {
   private static final Logger log = new Logger(AzureCloudBlobIterator.class);
-  private final AzureStorage storage;
   private final Iterator<URI> prefixesIterator;
   private final int maxListingLength;
+  private AzureStorage storage;
+  private String currentStorageAccount;
   private String currentContainer;
   private String currentPrefix;
   private CloudBlobHolder currentBlobItem;
@@ -48,16 +52,17 @@ public class AzureCloudBlobIterator implements 
Iterator<CloudBlobHolder>
 
   @AssistedInject
   AzureCloudBlobIterator(
-      AzureStorage storage,
+      @Assisted AzureStorage azureStorage,
       AzureAccountConfig config,
       @Assisted final Iterable<URI> prefixes,
       @Assisted final int maxListingLength
   )
   {
-    this.storage = storage;
+    this.storage = azureStorage;
     this.config = config;
     this.prefixesIterator = prefixes.iterator();
     this.maxListingLength = maxListingLength;
+    this.currentStorageAccount = null;
     this.currentContainer = null;
     this.currentPrefix = null;
     this.currentBlobItem = null;
@@ -91,8 +96,18 @@ public class AzureCloudBlobIterator implements 
Iterator<CloudBlobHolder>
   private void prepareNextRequest()
   {
     URI currentUri = prefixesIterator.next();
-    currentContainer = currentUri.getAuthority();
-    currentPrefix = AzureUtils.extractAzureKey(currentUri);
+
+    if (currentUri.getScheme().equals(AzureStorageAccountInputSource.SCHEME)) {
+      CloudObjectLocation cloudObjectLocation = new 
CloudObjectLocation(currentUri);
+      Pair<String, String> containerInfo = 
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(cloudObjectLocation);
+      currentStorageAccount = cloudObjectLocation.getBucket();
+      currentContainer = containerInfo.lhs;
+      currentPrefix = containerInfo.rhs;
+    } else {
+      currentStorageAccount = config.getAccount();
+      currentContainer = currentUri.getAuthority();
+      currentPrefix = AzureUtils.extractAzureKey(currentUri);
+    }
     log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
               currentUri, currentContainer, currentPrefix
     );
@@ -109,6 +124,7 @@ public class AzureCloudBlobIterator implements 
Iterator<CloudBlobHolder>
       );
       // We don't need to iterate by page because the client handles this, it 
will fetch the next page when necessary.
       blobItemIterator = storage.listBlobsWithPrefixInContainerSegmented(
+          currentStorageAccount,
           currentContainer,
           currentPrefix,
           maxListingLength,
@@ -135,7 +151,7 @@ public class AzureCloudBlobIterator implements 
Iterator<CloudBlobHolder>
       while (blobItemIterator.hasNext()) {
         BlobItem blobItem = blobItemIterator.next();
         if (!blobItem.isPrefix() && 
blobItem.getProperties().getContentLength() > 0) {
-          currentBlobItem = new CloudBlobHolder(blobItem, currentContainer);
+          currentBlobItem = new CloudBlobHolder(blobItem, currentContainer, 
currentStorageAccount);
           return;
         }
       }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
index 0197c96e247..da17bcb6ff9 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
@@ -26,5 +26,5 @@ import java.net.URI;
  */
 public interface AzureCloudBlobIteratorFactory
 {
-  AzureCloudBlobIterator create(Iterable<URI> prefixes, int maxListingLength);
+  AzureCloudBlobIterator create(Iterable<URI> prefixes, int maxListingLength, 
AzureStorage azureStorage);
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
index 0b8ccf6daee..5429b845ef6 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
@@ -22,6 +22,7 @@ package org.apache.druid.storage.azure;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.google.common.base.Predicates;
 import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.MapUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -51,7 +52,7 @@ public class AzureDataSegmentKiller implements 
DataSegmentKiller
       AzureDataSegmentConfig segmentConfig,
       AzureInputDataConfig inputDataConfig,
       AzureAccountConfig accountConfig,
-      final AzureStorage azureStorage,
+      @Global final AzureStorage azureStorage,
       AzureCloudBlobIterableFactory azureCloudBlobIterableFactory
   )
   {
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
index c20413b1169..44094daade6 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
@@ -21,6 +21,7 @@ package org.apache.druid.storage.azure;
 
 import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -37,16 +38,19 @@ public class AzureDataSegmentPuller
   private static final Logger log = new Logger(AzureDataSegmentPuller.class);
 
   private final AzureByteSourceFactory byteSourceFactory;
+  private final AzureStorage azureStorage;
 
   private final AzureAccountConfig azureAccountConfig;
 
   @Inject
   public AzureDataSegmentPuller(
       AzureByteSourceFactory byteSourceFactory,
+      @Global AzureStorage azureStorage,
       AzureAccountConfig azureAccountConfig
   )
   {
     this.byteSourceFactory = byteSourceFactory;
+    this.azureStorage = azureStorage;
     this.azureAccountConfig = azureAccountConfig;
   }
 
@@ -66,7 +70,7 @@ public class AzureDataSegmentPuller
 
       final String actualBlobPath = 
AzureUtils.maybeRemoveAzurePathPrefix(blobPath, 
azureAccountConfig.getBlobStorageEndpoint());
 
-      final ByteSource byteSource = byteSourceFactory.create(containerName, 
actualBlobPath);
+      final ByteSource byteSource = byteSourceFactory.create(containerName, 
actualBlobPath, azureStorage);
       final FileUtils.FileCopyResult result = CompressionUtils.unzip(
           byteSource,
           outDir,
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 8180b362b66..1b48d10c315 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.SegmentUtils;
@@ -51,7 +52,7 @@ public class AzureDataSegmentPusher implements 
DataSegmentPusher
 
   @Inject
   public AzureDataSegmentPusher(
-      AzureStorage azureStorage,
+      @Global AzureStorage azureStorage,
       AzureAccountConfig accountConfig,
       AzureDataSegmentConfig segmentConfig
   )
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
new file mode 100644
index 00000000000..aaf27127939
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.azure.core.http.policy.ExponentialBackoffOptions;
+import com.azure.core.http.policy.RetryOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSourceConfig;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+
+
+public class AzureIngestClientFactory extends AzureClientFactory
+{
+  private final AzureStorageAccountInputSourceConfig 
azureStorageAccountInputSourceConfig;
+
+  public AzureIngestClientFactory(AzureAccountConfig config, @Nullable 
AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig)
+  {
+    super(config);
+    this.azureStorageAccountInputSourceConfig = 
azureStorageAccountInputSourceConfig;
+  }
+
+  @Override
+  public BlobServiceClient buildNewClient(Integer retryCount, String 
storageAccount)
+  {
+    BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
+        .endpoint("https://"; + storageAccount + "." + 
config.getBlobStorageEndpoint());
+
+    if (azureStorageAccountInputSourceConfig == null) {
+      // If properties is not passed in inputSpec, use default azure 
credentials.
+      return super.buildNewClient(retryCount, storageAccount);
+    }
+
+    if (azureStorageAccountInputSourceConfig.getKey() != null) {
+      clientBuilder.credential(new StorageSharedKeyCredential(storageAccount, 
azureStorageAccountInputSourceConfig.getKey()));
+    } else if 
(azureStorageAccountInputSourceConfig.getSharedAccessStorageToken() != null) {
+      
clientBuilder.sasToken(azureStorageAccountInputSourceConfig.getSharedAccessStorageToken());
+    } else if 
(azureStorageAccountInputSourceConfig.getAppRegistrationClientId() != null && 
azureStorageAccountInputSourceConfig.getAppRegistrationClientSecret() != null) {
+      clientBuilder.credential(new ClientSecretCredentialBuilder()
+          
.clientSecret(azureStorageAccountInputSourceConfig.getAppRegistrationClientSecret())
+          
.clientId(azureStorageAccountInputSourceConfig.getAppRegistrationClientId())
+          .tenantId(azureStorageAccountInputSourceConfig.getTenantId())
+          .build()
+      );
+    } else {
+      // No credentials set in properties, use default azurecredentials.
+      return super.buildNewClient(retryCount, storageAccount);
+    }
+    clientBuilder.retryOptions(new RetryOptions(
+        new ExponentialBackoffOptions()
+            .setMaxRetries(retryCount != null ? retryCount : 
config.getMaxTries())
+            .setBaseDelay(Duration.ofMillis(1000))
+            .setMaxDelay(Duration.ofMillis(60000))
+    ));
+    return clientBuilder.buildClient();
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index b929d255c1c..9cf5952bd81 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -61,12 +61,15 @@ public class AzureStorage
   private static final Logger log = new Logger(AzureStorage.class);
 
   private final AzureClientFactory azureClientFactory;
+  private final String defaultStorageAccount;
 
   public AzureStorage(
-      AzureClientFactory azureClientFactory
+      AzureClientFactory azureClientFactory,
+      @Nullable String defaultStorageAccount
   )
   {
     this.azureClientFactory = azureClientFactory;
+    this.defaultStorageAccount = defaultStorageAccount;
   }
 
   public List<String> emptyCloudBlobDirectory(final String containerName, 
final String virtualDirPath)
@@ -216,18 +219,27 @@ public class AzureStorage
   @VisibleForTesting
   BlobServiceClient getBlobServiceClient(Integer maxAttempts)
   {
-    return azureClientFactory.getBlobServiceClient(maxAttempts);
+    return azureClientFactory.getBlobServiceClient(maxAttempts, 
defaultStorageAccount);
   }
 
+  @VisibleForTesting
+  BlobServiceClient getBlobServiceClient(String storageAccount, Integer 
maxAttempts)
+  {
+    return azureClientFactory.getBlobServiceClient(maxAttempts, 
storageAccount);
+  }
+
+  // This method is used in AzureCloudBlobIterator in a method where one 
azureStorage instance might need to list from multiple
+  // storage accounts, so storageAccount is a valid parameter.
   @VisibleForTesting
   PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
+      final String storageAccount,
       final String containerName,
       final String prefix,
       int maxResults,
       Integer maxAttempts
   ) throws BlobStorageException
   {
-    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(containerName, maxAttempts);
+    BlobContainerClient blobContainerClient = 
getOrCreateBlobContainerClient(storageAccount, containerName, maxAttempts);
     return blobContainerClient.listBlobs(
         new 
ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults),
         Duration.ofMillis(DELTA_BACKOFF_MS)
@@ -243,4 +255,9 @@ public class AzureStorage
   {
     return 
getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName);
   }
+
+  private BlobContainerClient getOrCreateBlobContainerClient(final String 
storageAccount, final String containerName, final Integer maxRetries)
+  {
+    return getBlobServiceClient(storageAccount, 
maxRetries).createBlobContainerIfNotExists(containerName);
+  }
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index 25db821601b..19aa5c7fbbe 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -31,9 +31,11 @@ import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.druid.data.input.azure.AzureEntityFactory;
 import org.apache.druid.data.input.azure.AzureInputSource;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
 import org.apache.druid.guice.Binders;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.ISE;
 
@@ -46,10 +48,6 @@ public class AzureStorageDruidModule implements DruidModule
 {
 
   public static final String SCHEME = "azure";
-  public static final String
-      STORAGE_CONNECTION_STRING_WITH_KEY = 
"DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;EndpointSuffix=%s;";
-  public static final String
-      STORAGE_CONNECTION_STRING_WITH_TOKEN = 
"DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s;EndpointSuffix=%s;";
   public static final String INDEX_ZIP_FILE_NAME = "index.zip";
 
   @Override
@@ -77,7 +75,8 @@ public class AzureStorageDruidModule implements DruidModule
           }
         },
         new SimpleModule().registerSubtypes(
-            new NamedType(AzureInputSource.class, SCHEME)
+            new NamedType(AzureInputSource.class, SCHEME),
+            new NamedType(AzureStorageAccountInputSource.class, 
AzureStorageAccountInputSource.SCHEME)
         )
     );
   }
@@ -135,11 +134,13 @@ public class AzureStorageDruidModule implements 
DruidModule
   }
 
   @Provides
+  @Global
   @LazySingleton
   public AzureStorage getAzureStorageContainer(
-      final AzureClientFactory azureClientFactory
+      final AzureClientFactory azureClientFactory,
+      final AzureAccountConfig azureAccountConfig
   )
   {
-    return new AzureStorage(azureClientFactory);
+    return new AzureStorage(azureClientFactory, 
azureAccountConfig.getAccount());
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 74e928edc67..ed881d52fcc 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -23,6 +23,7 @@ import com.azure.storage.blob.models.BlobStorageException;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -53,7 +54,7 @@ public class AzureTaskLogs implements TaskLogs
       AzureTaskLogsConfig config,
       AzureInputDataConfig inputDataConfig,
       AzureAccountConfig accountConfig,
-      AzureStorage azureStorage,
+      @Global AzureStorage azureStorage,
       AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
       CurrentTimeMillisSupplier timeSupplier)
   {
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
index 0c00a633d77..e214ab909ef 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
@@ -126,7 +126,7 @@ public class AzureUtils
         azureCloudBlobIterableFactory.create(ImmutableList.of(new 
CloudObjectLocation(
             bucket,
             prefix
-        ).toUri("azure")), config.getMaxListingLength());
+        ).toUri("azure")), config.getMaxListingLength(), storage);
     Iterator<CloudBlobHolder> iterator = azureCloudBlobIterable.iterator();
 
     while (iterator.hasNext()) {
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
index d2398aebda1..59ed10ed770 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
@@ -30,11 +30,13 @@ public class CloudBlobHolder
 {
   private final BlobItem delegate;
   private final String container;
+  private final String storageAccount;
 
-  public CloudBlobHolder(BlobItem delegate, String container)
+  public CloudBlobHolder(BlobItem delegate, String container, String 
storageAccount)
   {
     this.delegate = delegate;
     this.container = container;
+    this.storageAccount = storageAccount;
   }
 
   public String getContainerName()
@@ -42,6 +44,11 @@ public class CloudBlobHolder
     return container;
   }
 
+  public String getStorageAccount()
+  {
+    return storageAccount;
+  }
+
   public String getName()
   {
     return delegate.getName();
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
index 4264801f4ac..79be724c17f 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.StorageConnectorProvider;
@@ -36,6 +37,7 @@ import java.io.File;
 public class AzureStorageConnectorProvider extends AzureOutputConfig 
implements StorageConnectorProvider
 {
 
+  @Global
   @JacksonInject
   AzureStorage azureStorage;
 
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
index 0ff858336bb..8453b71b4b7 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.input.NullInputStream;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.storage.azure.AzureByteSource;
 import org.apache.druid.storage.azure.AzureByteSourceFactory;
+import org.apache.druid.storage.azure.AzureStorage;
 import org.apache.druid.storage.azure.AzureUtils;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -37,6 +38,7 @@ import java.net.URI;
 
 public class AzureEntityTest extends EasyMockSupport
 {
+  private static final String STORAGE_ACCOUNT_NAME = "storageAccount";
   private static final String CONTAINER_NAME = "container";
   private static final String BLOB_NAME = "blob";
   private static final int OFFSET = 20;
@@ -49,6 +51,7 @@ public class AzureEntityTest extends EasyMockSupport
   private AzureByteSource byteSource;
 
   private AzureEntity azureEntity;
+  private AzureStorage azureStorage;
 
   static {
     try {
@@ -65,6 +68,7 @@ public class AzureEntityTest extends EasyMockSupport
     location = createMock(CloudObjectLocation.class);
     byteSourceFactory = createMock(AzureByteSourceFactory.class);
     byteSource = createMock(AzureByteSource.class);
+    azureStorage = createMock(AzureStorage.class);
   }
 
   @Test
@@ -72,11 +76,11 @@ public class AzureEntityTest extends EasyMockSupport
   {
     EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
     EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
-    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
     
EasyMock.expect(location.toUri(AzureInputSource.SCHEME)).andReturn(ENTITY_URI);
     replayAll();
 
-    azureEntity = new AzureEntity(location, byteSourceFactory);
+    azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
 
     URI actualUri = azureEntity.getUri();
     Assert.assertEquals(ENTITY_URI, actualUri);
@@ -85,16 +89,38 @@ public class AzureEntityTest extends EasyMockSupport
 
   }
 
+  @Test
+  public void test_getUri_returnsLocationUri_azureStorageScheme()
+  {
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
+    replayAll();
+
+    azureEntity = new AzureEntity(
+        new CloudObjectLocation(STORAGE_ACCOUNT_NAME, CONTAINER_NAME + "/" + 
BLOB_NAME),
+        azureStorage,
+        AzureStorageAccountInputSource.SCHEME,
+        byteSourceFactory
+    );
+
+    Assert.assertEquals(
+        URI.create(AzureStorageAccountInputSource.SCHEME + "://" + 
STORAGE_ACCOUNT_NAME + "/" + CONTAINER_NAME + "/" + BLOB_NAME),
+        azureEntity.getUri()
+    );
+
+    verifyAll();
+
+  }
+
   @Test
   public void test_readFromStart_returnsExpectedStream() throws Exception
   {
     EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
     EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
     EasyMock.expect(byteSource.openStream(0)).andReturn(INPUT_STREAM);
-    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
     replayAll();
 
-    azureEntity = new AzureEntity(location, byteSourceFactory);
+    azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
 
     InputStream actualInputStream = azureEntity.readFrom(0);
     Assert.assertSame(INPUT_STREAM, actualInputStream);
@@ -106,10 +132,10 @@ public class AzureEntityTest extends EasyMockSupport
     EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
     EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
     EasyMock.expect(byteSource.openStream(OFFSET)).andReturn(INPUT_STREAM);
-    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
     replayAll();
 
-    azureEntity = new AzureEntity(location, byteSourceFactory);
+    azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
 
     InputStream actualInputStream = azureEntity.readFrom(OFFSET);
     Assert.assertSame(INPUT_STREAM, actualInputStream);
@@ -122,10 +148,10 @@ public class AzureEntityTest extends EasyMockSupport
       EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
       EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
       EasyMock.expect(byteSource.openStream(OFFSET)).andThrow(IO_EXCEPTION);
-      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
       replayAll();
 
-      azureEntity = new AzureEntity(location, byteSourceFactory);
+      azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
       azureEntity.readFrom(OFFSET);
     }
     catch (IOException e) {
@@ -138,25 +164,45 @@ public class AzureEntityTest extends EasyMockSupport
   {
     EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
     EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
-    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
     replayAll();
 
-    azureEntity = new AzureEntity(location, byteSourceFactory);
+    azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
     String actualPath = azureEntity.getPath();
 
     Assert.assertEquals(BLOB_NAME, actualPath);
     verifyAll();
   }
 
+  @Test
+  public void test_getPath_azureStorageScheme()
+  {
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
+    replayAll();
+
+    azureEntity = new AzureEntity(
+        new CloudObjectLocation(STORAGE_ACCOUNT_NAME, CONTAINER_NAME + "/" + 
BLOB_NAME),
+        azureStorage,
+        AzureStorageAccountInputSource.SCHEME,
+        byteSourceFactory
+    );
+
+    Assert.assertEquals(
+        CONTAINER_NAME + "/" + BLOB_NAME,
+        azureEntity.getPath()
+    );
+
+    verifyAll();
+  }
   @Test
   public void test_getRetryCondition_returnsExpectedRetryCondition()
   {
     EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
     EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
-    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_NAME)).andReturn(byteSource);
+    EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME, 
azureStorage)).andReturn(byteSource);
     replayAll();
 
-    azureEntity = new AzureEntity(location, byteSourceFactory);
+    azureEntity = new AzureEntity(location, azureStorage, 
AzureInputSource.SCHEME, byteSourceFactory);
     Predicate<Throwable> actualRetryCondition = 
azureEntity.getRetryCondition();
     Assert.assertSame(AzureUtils.AZURE_RETRY, actualRetryCondition);
   }
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
index 709e8c191c1..e11ce8ba548 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.data.input.impl.systemfield.SystemField;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.storage.azure.AzureAccountConfig;
 import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
 import org.apache.druid.storage.azure.AzureDataSegmentConfig;
 import org.apache.druid.storage.azure.AzureInputDataConfig;
@@ -74,6 +75,8 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
   private AzureEntityFactory entityFactory;
   private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
   private AzureInputDataConfig inputDataConfig;
+  private AzureAccountConfig accountConfig;
+
 
   static {
     try {
@@ -96,7 +99,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
     entityFactory = createMock(AzureEntityFactory.class);
     azureCloudBlobIterableFactory = 
createMock(AzureCloudBlobIterableFactory.class);
     inputDataConfig = createMock(AzureInputDataConfig.class);
-
+    accountConfig = createMock(AzureAccountConfig.class);
   }
 
   @Test
@@ -177,6 +180,7 @@ public class AzureInputSourceSerdeTest extends 
EasyMockSupport
     injectableValues.addValue(AzureEntityFactory.class, entityFactory);
     injectableValues.addValue(AzureCloudBlobIterableFactory.class, 
azureCloudBlobIterableFactory);
     injectableValues.addValue(AzureInputDataConfig.class, inputDataConfig);
+    injectableValues.addValue(AzureAccountConfig.class, accountConfig);
     return injectableValues;
   }
 
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index 692c8dfeeba..a7cb7c708c6 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.data.input.azure;
 
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
@@ -130,7 +132,7 @@ public class AzureInputSourceTest extends EasyMockSupport
   @Test
   public void test_createEntity_returnsExpectedEntity()
   {
-    
EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1)).andReturn(azureEntity1);
+    EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1, storage, 
AzureInputSource.SCHEME)).andReturn(azureEntity1);
     
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)).times(2);
     replayAll();
 
@@ -161,7 +163,7 @@ public class AzureInputSourceTest extends EasyMockSupport
     List<CloudBlobHolder> expectedCloudBlobs = 
ImmutableList.of(cloudBlobDruid1);
     Iterator<CloudBlobHolder> expectedCloudBlobsIterator = 
expectedCloudBlobs.iterator();
     
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
-    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH)).andReturn(
+    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH, storage)).andReturn(
         azureCloudBlobIterable);
     
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
     
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
@@ -209,7 +211,7 @@ public class AzureInputSourceTest extends EasyMockSupport
     );
 
     
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
-    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH)).andReturn(
+    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH, storage)).andReturn(
         azureCloudBlobIterable);
     
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
     
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
@@ -357,7 +359,9 @@ public class AzureInputSourceTest extends EasyMockSupport
 
     final AzureEntity entity = new AzureEntity(
         new CloudObjectLocation("foo", "bar"),
-        (containerName, blobPath) -> null
+        storage,
+        AzureInputSource.SCHEME,
+        (containerName, blobPath, storage) -> null
     );
 
     Assert.assertEquals("azure://foo/bar", 
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
@@ -371,7 +375,8 @@ public class AzureInputSourceTest extends EasyMockSupport
     EqualsVerifier.forClass(AzureInputSource.class)
                   .usingGetClass()
                   .withPrefabValues(Logger.class, new 
Logger(AzureStorage.class), new Logger(AzureStorage.class))
-                  .withPrefabValues(AzureStorage.class, new 
AzureStorage(null), new AzureStorage(null))
+                  .withPrefabValues(BlobContainerClient.class, new 
BlobContainerClientBuilder().buildClient(), new 
BlobContainerClientBuilder().buildClient())
+                  .withPrefabValues(AzureStorage.class, new AzureStorage(null, 
null), new AzureStorage(null, null))
                   .withNonnullFields("storage")
                   .withNonnullFields("entityFactory")
                   .withNonnullFields("azureCloudBlobIterableFactory")
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
similarity index 63%
copy from 
extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
copy to 
extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
index 692c8dfeeba..8d17d9ba01e 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.data.input.azure;
 
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
@@ -31,10 +33,13 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.data.input.impl.systemfield.SystemField;
 import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.storage.azure.AzureAccountConfig;
 import org.apache.druid.storage.azure.AzureCloudBlobIterable;
 import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
+import org.apache.druid.storage.azure.AzureIngestClientFactory;
 import org.apache.druid.storage.azure.AzureInputDataConfig;
 import org.apache.druid.storage.azure.AzureStorage;
 import org.apache.druid.storage.azure.blob.CloudBlobHolder;
@@ -55,17 +60,18 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-public class AzureInputSourceTest extends EasyMockSupport
+public class AzureStorageAccountInputSourceTest extends EasyMockSupport
 {
-  private static final String CONTAINER_NAME = "container";
   private static final String BLOB_NAME = "blob";
   private static final URI PREFIX_URI;
   private final List<URI> EMPTY_URIS = ImmutableList.of();
   private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
   private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
+  private static final String STORAGE_ACCOUNT = "STORAGE_ACCOUNT";
+  private static final String DEFAULT_STORAGE_ACCOUNT = 
"DEFAULT_STORAGE_ACCOUNT";
   private static final String CONTAINER = "CONTAINER";
   private static final String BLOB_PATH = "BLOB_PATH.csv";
-  private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new 
CloudObjectLocation(CONTAINER, BLOB_PATH);
+  private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new 
CloudObjectLocation(STORAGE_ACCOUNT, CONTAINER + "/" + BLOB_PATH);
   private static final int MAX_LISTING_LENGTH = 10;
 
   private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
@@ -80,17 +86,19 @@ public class AzureInputSourceTest extends EasyMockSupport
   private AzureEntityFactory entityFactory;
   private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
   private AzureInputDataConfig inputDataConfig;
+  private AzureStorageAccountInputSourceConfig 
azureStorageAccountInputSourceConfig;
+  private AzureAccountConfig azureAccountConfig;
 
   private InputSplit<List<CloudObjectLocation>> inputSplit;
   private AzureEntity azureEntity1;
   private CloudBlobHolder cloudBlobDruid1;
   private AzureCloudBlobIterable azureCloudBlobIterable;
 
-  private AzureInputSource azureInputSource;
+  private AzureStorageAccountInputSource azureInputSource;
 
   static {
     try {
-      PREFIX_URI = new URI(AzureInputSource.SCHEME + "://" + CONTAINER_NAME + 
"/" + BLOB_NAME);
+      PREFIX_URI = new URI(AzureStorageAccountInputSource.SCHEME + "://" + 
STORAGE_ACCOUNT + "/" + CONTAINER + "/" + BLOB_NAME);
     }
     catch (Exception e) {
       throw new RuntimeException(e);
@@ -108,21 +116,25 @@ public class AzureInputSourceTest extends EasyMockSupport
     inputDataConfig = createMock(AzureInputDataConfig.class);
     cloudBlobDruid1 = createMock(CloudBlobHolder.class);
     azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class);
+    azureStorageAccountInputSourceConfig = 
createMock(AzureStorageAccountInputSourceConfig.class);
+    azureAccountConfig = createMock(AzureAccountConfig.class);
+    
EasyMock.expect(azureAccountConfig.getAccount()).andReturn(DEFAULT_STORAGE_ACCOUNT).anyTimes();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void 
test_constructor_emptyUrisEmptyPrefixesEmptyObjects_throwsIllegalArgumentException()
   {
     replayAll();
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         EMPTY_PREFIXES,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
   }
@@ -130,20 +142,21 @@ public class AzureInputSourceTest extends EasyMockSupport
   @Test
   public void test_createEntity_returnsExpectedEntity()
   {
-    
EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1)).andReturn(azureEntity1);
+    EasyMock.expect(entityFactory.create(EasyMock.eq(CLOUD_OBJECT_LOCATION_1), 
EasyMock.anyObject(AzureStorage.class), 
EasyMock.eq(AzureStorageAccountInputSource.SCHEME))).andReturn(azureEntity1);
     
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)).times(2);
     replayAll();
 
     List<CloudObjectLocation> objects = 
ImmutableList.of(CLOUD_OBJECT_LOCATION_1);
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         EMPTY_PREFIXES,
         objects,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
 
@@ -161,23 +174,25 @@ public class AzureInputSourceTest extends EasyMockSupport
     List<CloudBlobHolder> expectedCloudBlobs = 
ImmutableList.of(cloudBlobDruid1);
     Iterator<CloudBlobHolder> expectedCloudBlobsIterator = 
expectedCloudBlobs.iterator();
     
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
-    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH)).andReturn(
+    
EasyMock.expect(azureCloudBlobIterableFactory.create(EasyMock.eq(prefixes), 
EasyMock.eq(MAX_LISTING_LENGTH), 
EasyMock.anyObject(AzureStorage.class))).andReturn(
         azureCloudBlobIterable);
     
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
+    
EasyMock.expect(cloudBlobDruid1.getStorageAccount()).andReturn(STORAGE_ACCOUNT).anyTimes();
     
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
     EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
     
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
     replayAll();
 
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
 
@@ -187,7 +202,7 @@ public class AzureInputSourceTest extends EasyMockSupport
     );
 
     List<List<CloudObjectLocation>> actualCloudLocationList = 
cloudObjectStream.map(InputSplit::get)
-                                                                               
.collect(Collectors.toList());
+        .collect(Collectors.toList());
     verifyAll();
     Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
   }
@@ -209,24 +224,26 @@ public class AzureInputSourceTest extends EasyMockSupport
     );
 
     
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
-    EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, 
MAX_LISTING_LENGTH)).andReturn(
+    
EasyMock.expect(azureCloudBlobIterableFactory.create(EasyMock.eq(prefixes), 
EasyMock.eq(MAX_LISTING_LENGTH), 
EasyMock.anyObject(AzureStorage.class))).andReturn(
         azureCloudBlobIterable);
     
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
+    
EasyMock.expect(cloudBlobDruid1.getStorageAccount()).andReturn(STORAGE_ACCOUNT).anyTimes();
     
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
     
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
     EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
 
     replayAll();
 
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         objectGlob,
+        azureStorageAccountInputSourceConfig,
         null
     );
 
@@ -236,7 +253,7 @@ public class AzureInputSourceTest extends EasyMockSupport
     );
 
     List<List<CloudObjectLocation>> actualCloudLocationList = 
cloudObjectStream.map(InputSplit::get)
-                                                                               
.collect(Collectors.toList());
+        .collect(Collectors.toList());
     verifyAll();
     Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
   }
@@ -248,15 +265,16 @@ public class AzureInputSourceTest extends EasyMockSupport
     
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
     replayAll();
 
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
 
@@ -269,22 +287,22 @@ public class AzureInputSourceTest extends EasyMockSupport
   public void test_toString_returnsExpectedString()
   {
     List<URI> prefixes = ImmutableList.of(PREFIX_URI);
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
-
-    String actualToString = azureInputSource.toString();
+    String azureStorageAccountInputSourceString = azureInputSource.toString();
     Assert.assertEquals(
-        "AzureInputSource{uris=[], prefixes=[azure://container/blob], 
objects=[], objectGlob=null}",
-        actualToString
+        "AzureStorageAccountInputSource{uris=[], 
prefixes=[azureStorage://STORAGE_ACCOUNT/CONTAINER/blob], objects=[], 
objectGlob=null, azureStorageAccountInputSourceConfig=" + 
azureStorageAccountInputSourceConfig + "}",
+        azureStorageAccountInputSourceString
     );
   }
 
@@ -292,28 +310,31 @@ public class AzureInputSourceTest extends EasyMockSupport
   public void test_toString_withAllSystemFields_returnsExpectedString()
   {
     List<URI> prefixes = ImmutableList.of(PREFIX_URI);
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, 
SystemField.PATH))
     );
 
-    String actualToString = azureInputSource.toString();
+    String azureStorageAccountInputSourceString = azureInputSource.toString();
+
     Assert.assertEquals(
-        "AzureInputSource{"
-        + "uris=[], "
-        + "prefixes=[azure://container/blob], "
-        + "objects=[], "
-        + "objectGlob=null, "
-        + "systemFields=[__file_uri, __file_bucket, __file_path]"
-        + "}",
-        actualToString
+        "AzureStorageAccountInputSource{"
+            + "uris=[], "
+            + "prefixes=[azureStorage://STORAGE_ACCOUNT/CONTAINER/blob], "
+            + "objects=[], "
+            + "objectGlob=null, "
+            + "azureStorageAccountInputSourceConfig=" + 
azureStorageAccountInputSourceConfig + ", "
+            + "systemFields=[__file_uri, __file_bucket, __file_path]"
+            + "}",
+        azureStorageAccountInputSourceString
     );
   }
 
@@ -321,32 +342,34 @@ public class AzureInputSourceTest extends EasyMockSupport
   public void test_getTypes_returnsExpectedTypes()
   {
     List<URI> prefixes = ImmutableList.of(PREFIX_URI);
-    azureInputSource = new AzureInputSource(
-        storage,
+    azureInputSource = new AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         prefixes,
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         null
     );
-    Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME), 
azureInputSource.getTypes());
+    
Assert.assertEquals(ImmutableSet.of(AzureStorageAccountInputSource.SCHEME), 
azureInputSource.getTypes());
   }
 
   @Test
   public void test_systemFields()
   {
-    azureInputSource = (AzureInputSource) new AzureInputSource(
-        storage,
+    azureInputSource = (AzureStorageAccountInputSource) new 
AzureStorageAccountInputSource(
         entityFactory,
         azureCloudBlobIterableFactory,
         inputDataConfig,
+        azureAccountConfig,
         EMPTY_URIS,
         ImmutableList.of(PREFIX_URI),
         EMPTY_OBJECTS,
         null,
+        azureStorageAccountInputSourceConfig,
         new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, 
SystemField.PATH))
     );
 
@@ -356,29 +379,56 @@ public class AzureInputSourceTest extends EasyMockSupport
     );
 
     final AzureEntity entity = new AzureEntity(
-        new CloudObjectLocation("foo", "bar"),
-        (containerName, blobPath) -> null
+        new CloudObjectLocation("foo", "container/bar"),
+        storage,
+        AzureStorageAccountInputSource.SCHEME,
+        (containerName, blobPath, storage) -> null
     );
 
-    Assert.assertEquals("azure://foo/bar", 
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
+    Assert.assertEquals("azureStorage://foo/container/bar", 
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
     Assert.assertEquals("foo", azureInputSource.getSystemFieldValue(entity, 
SystemField.BUCKET));
-    Assert.assertEquals("bar", azureInputSource.getSystemFieldValue(entity, 
SystemField.PATH));
+    Assert.assertEquals("container/bar", 
azureInputSource.getSystemFieldValue(entity, SystemField.PATH));
   }
 
   @Test
   public void abidesEqualsContract()
   {
-    EqualsVerifier.forClass(AzureInputSource.class)
-                  .usingGetClass()
-                  .withPrefabValues(Logger.class, new 
Logger(AzureStorage.class), new Logger(AzureStorage.class))
-                  .withPrefabValues(AzureStorage.class, new 
AzureStorage(null), new AzureStorage(null))
-                  .withNonnullFields("storage")
-                  .withNonnullFields("entityFactory")
-                  .withNonnullFields("azureCloudBlobIterableFactory")
-                  .withNonnullFields("inputDataConfig")
-                  .withNonnullFields("objectGlob")
-                  .withNonnullFields("scheme")
-                  .verify();
+    EqualsVerifier.forClass(AzureStorageAccountInputSource.class)
+        .usingGetClass()
+        .withPrefabValues(Logger.class, new Logger(AzureStorage.class), new 
Logger(AzureStorage.class))
+        .withPrefabValues(BlobContainerClient.class, new 
BlobContainerClientBuilder().buildClient(), new 
BlobContainerClientBuilder().buildClient())
+        .withPrefabValues(AzureIngestClientFactory.class, new 
AzureIngestClientFactory(null, null), new AzureIngestClientFactory(null, null))
+        .withIgnoredFields("entityFactory")
+        .withIgnoredFields("azureCloudBlobIterableFactory")
+        .withNonnullFields("inputDataConfig")
+        .withNonnullFields("objectGlob")
+        .withNonnullFields("scheme")
+        .withNonnullFields("azureStorageAccountInputSourceConfig")
+        .withNonnullFields("azureAccountConfig")
+        .withNonnullFields("azureIngestClientFactory")
+        .verify();
+  }
+
+  @Test
+  public void test_getContainerAndPathFromObjectLocation()
+  {
+    Pair<String, String> storageLocation = 
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(
+        CLOUD_OBJECT_LOCATION_1
+    );
+    Assert.assertEquals(CONTAINER, storageLocation.lhs);
+    Assert.assertEquals(BLOB_PATH, storageLocation.rhs);
+
+  }
+
+  @Test
+  public void test_getContainerAndPathFromObjectLocatio_nullpath()
+  {
+    Pair<String, String> storageLocation = 
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(
+        new CloudObjectLocation(STORAGE_ACCOUNT, CONTAINER)
+    );
+    Assert.assertEquals(CONTAINER, storageLocation.lhs);
+    Assert.assertEquals("", storageLocation.rhs);
+
   }
 
   @After
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
index bbf07b402dd..4093719d182 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
@@ -41,8 +41,7 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     azureClientFactory = new AzureClientFactory(config);
-    config.setAccount(ACCOUNT);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName());
   }
 
@@ -51,9 +50,8 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     config.setKey("key");
-    config.setAccount(ACCOUNT);
     azureClientFactory = new AzureClientFactory(config);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     StorageSharedKeyCredential storageSharedKeyCredential = 
StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline(
         blobServiceClient.getHttpPipeline()
     );
@@ -71,9 +69,8 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     config.setSharedAccessStorageToken("sasToken");
-    config.setAccount(ACCOUNT);
     azureClientFactory = new AzureClientFactory(config);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     AzureSasCredentialPolicy azureSasCredentialPolicy = null;
     for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); 
i++) {
       if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof 
AzureSasCredentialPolicy) {
@@ -89,9 +86,8 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     config.setUseAzureCredentialsChain(true);
-    config.setAccount(ACCOUNT);
     azureClientFactory = new AzureClientFactory(config);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null;
     for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); 
i++) {
       if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof 
BearerTokenAuthenticationPolicy) {
@@ -107,10 +103,9 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     config.setUseAzureCredentialsChain(true);
-    config.setAccount(ACCOUNT);
     azureClientFactory = new AzureClientFactory(config);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
-    BlobServiceClient blobServiceClient2 = 
azureClientFactory.getBlobServiceClient(null);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
+    BlobServiceClient blobServiceClient2 = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     Assert.assertEquals(blobServiceClient, blobServiceClient2);
   }
 
@@ -119,10 +114,9 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = new AzureAccountConfig();
     config.setUseAzureCredentialsChain(true);
-    config.setAccount(ACCOUNT);
     azureClientFactory = new AzureClientFactory(config);
-    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null);
-    BlobServiceClient blobServiceClient2 = 
azureClientFactory.getBlobServiceClient(1);
+    BlobServiceClient blobServiceClient = 
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
+    BlobServiceClient blobServiceClient2 = 
azureClientFactory.getBlobServiceClient(1, ACCOUNT);
     Assert.assertNotEquals(blobServiceClient, blobServiceClient2);
   }
 
@@ -131,12 +125,11 @@ public class AzureClientFactoryTest
   {
     AzureAccountConfig config = EasyMock.createMock(AzureAccountConfig.class);
     EasyMock.expect(config.getKey()).andReturn("key").times(2);
-    EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2);
     EasyMock.expect(config.getMaxTries()).andReturn(3);
     
EasyMock.expect(config.getBlobStorageEndpoint()).andReturn(AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
     azureClientFactory = new AzureClientFactory(config);
     EasyMock.replay(config);
-    azureClientFactory.getBlobServiceClient(null);
+    azureClientFactory.getBlobServiceClient(null, ACCOUNT);
     EasyMock.verify(config);
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
index d66c3c21156..996028377ed 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
@@ -54,11 +54,16 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
   private final Integer MAX_TRIES = 3;
   private final Integer MAX_LISTING_LENGTH = 10;
   private final String CONTAINER = "container";
+  private final String STORAGE_ACCOUNT = "storageAccount";
+  private final String DEFAULT_STORAGE_ACCOUNT = "defaultStorageAccount";
+
 
   @Before
   public void setup()
   {
     config.setMaxTries(MAX_TRIES);
+    config.setAccount(DEFAULT_STORAGE_ACCOUNT);
+
   }
 
   @Test
@@ -86,7 +91,108 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
     supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
     PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
-    EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, 
"dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
 CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+        .andReturn(pagedIterable);
+
+    BlobItem blobPrefixItem = new 
BlobItem().setIsPrefix(true).setName("subdir").setProperties(new 
BlobItemProperties());
+    BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new 
BlobItemProperties().setContentLength(10L));
+    SettableSupplier<PagedResponse<BlobItem>> supplier2 = new 
SettableSupplier<>();
+    supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem, 
blobItem2)));
+    PagedIterable<BlobItem> pagedIterable2 = new PagedIterable<>(supplier2);
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
 CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES))
+        .andReturn(pagedIterable2);
+
+    replayAll();
+    azureCloudBlobIterator = new AzureCloudBlobIterator(
+        storage,
+        config,
+        prefixes,
+        MAX_LISTING_LENGTH
+    );
+    List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
+    while (azureCloudBlobIterator.hasNext()) {
+      actualBlobItems.add(azureCloudBlobIterator.next());
+    }
+    verifyAll();
+    List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
+        new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT),
+        new CloudBlobHolder(blobItem2, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
+    );
+    Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+    );
+  }
+
+  @Test
+  public void 
test_next_prefixesWithMultipleBlobsAndOneDirectory_returnsExpectedBlobs() 
throws Exception
+  {
+    List<URI> prefixes = ImmutableList.of(
+        new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
+    );
+
+    BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new 
BlobItemProperties().setContentLength(10L));
+    BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new 
BlobItemProperties().setContentLength(10L));
+    SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
+    supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem, 
blobItem2)));
+    PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
 CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+        .andReturn(pagedIterable);
+
+
+    replayAll();
+    azureCloudBlobIterator = new AzureCloudBlobIterator(
+        storage,
+        config,
+        prefixes,
+        MAX_LISTING_LENGTH
+    );
+    List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
+    while (azureCloudBlobIterator.hasNext()) {
+      actualBlobItems.add(azureCloudBlobIterator.next());
+    }
+    verifyAll();
+    List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
+        new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT),
+        new CloudBlobHolder(blobItem2, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
+    );
+    Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+    );
+  }
+
+  @Test
+  public void 
test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpectedBlobs_azureStorage()
 throws Exception
+  {
+    List<URI> prefixes = ImmutableList.of(
+        new URI(StringUtils.format("azureStorage://%s/%s/dir1", 
STORAGE_ACCOUNT, CONTAINER)),
+        new URI(StringUtils.format("azureStorage://%s/%s/dir2", 
STORAGE_ACCOUNT, CONTAINER))
+    );
+
+    BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new 
BlobItemProperties().setContentLength(10L));
+    SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
+    supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
+    PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(STORAGE_ACCOUNT,
 CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
         .andReturn(pagedIterable);
 
     BlobItem blobPrefixItem = new 
BlobItem().setIsPrefix(true).setName("subdir").setProperties(new 
BlobItemProperties());
@@ -94,7 +200,7 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     SettableSupplier<PagedResponse<BlobItem>> supplier2 = new 
SettableSupplier<>();
     supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem, 
blobItem2)));
     PagedIterable<BlobItem> pagedIterable2 = new PagedIterable<>(supplier2);
-    EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, 
"dir2", MAX_LISTING_LENGTH, MAX_TRIES))
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(STORAGE_ACCOUNT,
 CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES))
         .andReturn(pagedIterable2);
 
     replayAll();
@@ -110,13 +216,22 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     }
     verifyAll();
     List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
-        new CloudBlobHolder(blobItem, CONTAINER),
-        new CloudBlobHolder(blobItem2, CONTAINER)
+        new CloudBlobHolder(blobItem, CONTAINER, STORAGE_ACCOUNT),
+        new CloudBlobHolder(blobItem2, CONTAINER, STORAGE_ACCOUNT)
     );
     Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
     Assert.assertEquals(
         
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
-        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
+        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+    );
   }
 
   @Test
@@ -132,7 +247,7 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
     supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem, 
blobItem2)));
     PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
-    EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, 
"dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+    
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
 CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
         .andReturn(pagedIterable);
 
     replayAll();
@@ -148,12 +263,21 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     }
     verifyAll();
     List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
-        new CloudBlobHolder(blobItem, CONTAINER)
+        new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
     );
     Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
     Assert.assertEquals(
         
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
-        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
+        
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+    );
+    Assert.assertEquals(
+        
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+        
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+    );
   }
 
   @Test(expected = NoSuchElementException.class)
@@ -176,6 +300,7 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
     );
 
     EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
+        EasyMock.anyString(),
         EasyMock.anyString(),
         EasyMock.anyString(),
         EasyMock.anyInt(),
@@ -199,6 +324,7 @@ public class AzureCloudBlobIteratorTest extends 
EasyMockSupport
         new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
     );
     EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
+        EasyMock.anyString(),
         EasyMock.anyString(),
         EasyMock.anyString(),
         EasyMock.anyInt(),
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
index 2c24ea23e98..55b5a7dc612 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
@@ -184,7 +184,9 @@ public class AzureDataSegmentKillerTest extends 
EasyMockSupport
         azureCloudBlobIterableFactory,
         MAX_KEYS,
         PREFIX_URI,
-        ImmutableList.of(object1, object2));
+        ImmutableList.of(object1, object2),
+        azureStorage
+    );
 
     EasyMock.replay(object1, object2);
     AzureTestUtils.expectDeleteObjects(
@@ -217,7 +219,8 @@ public class AzureDataSegmentKillerTest extends 
EasyMockSupport
           azureCloudBlobIterableFactory,
           MAX_KEYS,
           PREFIX_URI,
-          ImmutableList.of(object1)
+          ImmutableList.of(object1),
+          azureStorage
       );
 
       EasyMock.replay(object1);
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
index ac851877c53..ebcefd79571 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
@@ -62,12 +62,12 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       final InputStream zipStream = new FileInputStream(pulledFile);
       final AzureAccountConfig config = new AzureAccountConfig();
 
-      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
+      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH, 
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
       EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
 
       replayAll();
 
-      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, config);
+      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
 
       FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, 
BLOB_PATH, toDir);
 
@@ -95,12 +95,12 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       final InputStream zipStream = new FileInputStream(pulledFile);
       final AzureAccountConfig config = new AzureAccountConfig();
 
-      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
+      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH, 
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
       EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
 
       replayAll();
 
-      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, config);
+      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
 
       FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, 
BLOB_PATH_HADOOP, toDir);
 
@@ -125,7 +125,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
 
     final File outDir = FileUtils.createTempDir();
     try {
-      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
+      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH, 
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
       EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
           new RuntimeException(
               "error"
@@ -134,7 +134,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
 
       replayAll();
 
-      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, config);
+      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
 
       puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
     }
@@ -159,7 +159,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       HttpResponse httpResponse = createMock(HttpResponse.class);
       EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes();
       EasyMock.replay(httpResponse);
-      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
+      EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH, 
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
       EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
           new BlobStorageException("", httpResponse, null)
       ).atLeastOnce();
@@ -167,7 +167,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       EasyMock.replay(azureStorage);
       EasyMock.replay(byteSourceFactory);
 
-      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, config);
+      AzureDataSegmentPuller puller = new 
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
 
       puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
 
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
new file mode 100644
index 00000000000..d982cf2253e
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.azure.core.http.policy.AzureSasCredentialPolicy;
+import com.azure.core.http.policy.BearerTokenAuthenticationPolicy;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSourceConfig;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+@RunWith(EasyMockRunner.class)
+public class AzureIngestClientFactoryTest extends EasyMockSupport
+{
+  private AzureIngestClientFactory azureIngestClientFactory;
+  private static final String ACCOUNT = "account";
+  private static final String KEY = "key";
+  private static final String TOKEN = "token";
+
+  @Mock
+  private static AzureAccountConfig accountConfig;
+
+  @Mock
+  private static AzureStorageAccountInputSourceConfig 
azureStorageAccountInputSourceConfig;
+
+  @Before
+  public void setup()
+  {
+    
EasyMock.expect(accountConfig.getBlobStorageEndpoint()).andReturn("blob.core.windows.net").anyTimes();
+  }
+
+  @Test
+  public void test_blobServiceClient_accountName()
+  {
+    AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig 
= new AzureStorageAccountInputSourceConfig(
+        null,
+        KEY,
+        null,
+        null,
+        null
+    );
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+    replayAll();
+    BlobServiceClient blobServiceClient = 
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+    verifyAll();
+
+    Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName());
+  }
+
+  @Test
+  public void test_blobServiceClientBuilder_key() throws MalformedURLException
+  {
+    AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig 
= new AzureStorageAccountInputSourceConfig(
+        null,
+        KEY,
+        null,
+        null,
+        null
+    );
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+
+    replayAll();
+    BlobServiceClient blobServiceClient = 
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+    verifyAll();
+    StorageSharedKeyCredential storageSharedKeyCredential = 
StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline(
+        blobServiceClient.getHttpPipeline()
+    );
+    Assert.assertNotNull(storageSharedKeyCredential);
+
+    // Azure doesn't let us look at the key in the StorageSharedKeyCredential 
so make sure the authorization header generated is what we expect.
+    Assert.assertEquals(
+        new StorageSharedKeyCredential(ACCOUNT, 
KEY).generateAuthorizationHeader(new URL("http://druid.com";), "POST", 
ImmutableMap.of()),
+        storageSharedKeyCredential.generateAuthorizationHeader(new 
URL("http://druid.com";), "POST", ImmutableMap.of())
+    );
+  }
+
+  @Test
+  public void test_blobServiceClientBuilder_sasToken()
+  {
+    AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig 
= new AzureStorageAccountInputSourceConfig(
+        TOKEN,
+        null,
+        null,
+        null,
+        null
+    );
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+    replayAll();
+    BlobServiceClient blobServiceClient = 
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+    verifyAll();
+
+    AzureSasCredentialPolicy azureSasCredentialPolicy = null;
+    for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); 
i++) {
+      if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof 
AzureSasCredentialPolicy) {
+        azureSasCredentialPolicy = (AzureSasCredentialPolicy) 
blobServiceClient.getHttpPipeline().getPolicy(i);
+      }
+    }
+
+    Assert.assertNotNull(azureSasCredentialPolicy);
+  }
+
+  @Test
+  public void test_blobServiceClientBuilder_useAppRegistration()
+  {
+    AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig 
= new AzureStorageAccountInputSourceConfig(
+        null,
+        null,
+        "clientId",
+        "clientSecret",
+        "tenantId"
+    );
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+    replayAll();
+    BlobServiceClient blobServiceClient = 
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+    verifyAll();
+    BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null;
+    for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); 
i++) {
+      if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof 
BearerTokenAuthenticationPolicy) {
+        bearerTokenAuthenticationPolicy = (BearerTokenAuthenticationPolicy) 
blobServiceClient.getHttpPipeline().getPolicy(i);
+      }
+    }
+
+    Assert.assertNotNull(bearerTokenAuthenticationPolicy);
+  }
+
+
+  @Test
+  public void 
test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTries()
+  {
+    // We should only call getKey twice (both times in the first call to 
getBlobServiceClient)
+    
EasyMock.expect(azureStorageAccountInputSourceConfig.getKey()).andReturn(KEY).times(2);
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+    EasyMock.expect(accountConfig.getMaxTries()).andReturn(5);
+    replayAll();
+    azureIngestClientFactory.getBlobServiceClient(null, ACCOUNT);
+
+    // should use the cached client and not call getKey
+    azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+
+    // should use the cached client and not call getKey
+    azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_blobServiceClientBuilder_fallbackToAzureAccountConfig()
+  {
+    AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig 
= new AzureStorageAccountInputSourceConfig(
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    azureIngestClientFactory = new AzureIngestClientFactory(accountConfig, 
azureStorageAccountInputSourceConfig);
+    EasyMock.expect(accountConfig.getKey()).andReturn(KEY).times(2);
+    replayAll();
+    azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+    verifyAll();
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
index 27d02cd2354..ba69591f6c8 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
@@ -30,6 +30,8 @@ import com.google.inject.Module;
 import com.google.inject.ProvisionException;
 import com.google.inject.TypeLiteral;
 import org.apache.druid.data.input.azure.AzureEntityFactory;
+import org.apache.druid.data.input.azure.AzureInputSource;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.guice.DruidGuiceExtensions;
 import org.apache.druid.guice.JsonConfigurator;
@@ -63,12 +65,13 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   private static final String AZURE_CONTAINER;
   private static final String AZURE_PREFIX;
   private static final int AZURE_MAX_LISTING_LENGTH;
-  private static final String PATH = "path";
+  private static final String PATH = "path/subpath";
   private static final Iterable<URI> EMPTY_PREFIXES_ITERABLE = 
ImmutableList.of();
   private static final Properties PROPERTIES;
 
   private CloudObjectLocation cloudObjectLocation1;
   private CloudObjectLocation cloudObjectLocation2;
+  private AzureStorage azureStorage;
   private Injector injector;
 
   static {
@@ -93,6 +96,7 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   {
     cloudObjectLocation1 = createMock(CloudObjectLocation.class);
     cloudObjectLocation2 = createMock(CloudObjectLocation.class);
+    azureStorage = createMock(AzureStorage.class);
   }
 
   @Test
@@ -143,8 +147,8 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   {
     injector = makeInjectorWithProperties(PROPERTIES);
     AzureByteSourceFactory factory = 
injector.getInstance(AzureByteSourceFactory.class);
-    Object object1 = factory.create("container1", "blob1");
-    Object object2 = factory.create("container2", "blob2");
+    Object object1 = factory.create("container1", "blob1", azureStorage);
+    Object object2 = factory.create("container2", "blob2", azureStorage);
     Assert.assertNotNull(object1);
     Assert.assertNotNull(object2);
     Assert.assertNotSame(object1, object2);
@@ -155,17 +159,20 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   {
     
EasyMock.expect(cloudObjectLocation1.getBucket()).andReturn(AZURE_CONTAINER);
     
EasyMock.expect(cloudObjectLocation2.getBucket()).andReturn(AZURE_CONTAINER);
-    EasyMock.expect(cloudObjectLocation1.getPath()).andReturn(PATH);
+    EasyMock.expect(cloudObjectLocation1.getPath()).andReturn(PATH).times(2);
     EasyMock.expect(cloudObjectLocation2.getPath()).andReturn(PATH);
     replayAll();
 
     injector = makeInjectorWithProperties(PROPERTIES);
     AzureEntityFactory factory = 
injector.getInstance(AzureEntityFactory.class);
-    Object object1 = factory.create(cloudObjectLocation1);
-    Object object2 = factory.create(cloudObjectLocation2);
+    Object object1 = factory.create(cloudObjectLocation1, azureStorage, 
AzureInputSource.SCHEME);
+    Object object2 = factory.create(cloudObjectLocation2, azureStorage, 
AzureInputSource.SCHEME);
+    Object object3 = factory.create(cloudObjectLocation1, azureStorage, 
AzureStorageAccountInputSource.SCHEME);
     Assert.assertNotNull(object1);
     Assert.assertNotNull(object2);
+    Assert.assertNotNull(object3);
     Assert.assertNotSame(object1, object2);
+    Assert.assertNotSame(object1, object3);
   }
 
   @Test
@@ -173,8 +180,8 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   {
     injector = makeInjectorWithProperties(PROPERTIES);
     AzureCloudBlobIteratorFactory factory = 
injector.getInstance(AzureCloudBlobIteratorFactory.class);
-    Object object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
-    Object object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
+    Object object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10, azureStorage);
+    Object object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10, azureStorage);
     Assert.assertNotNull(object1);
     Assert.assertNotNull(object2);
     Assert.assertNotSame(object1, object2);
@@ -185,8 +192,8 @@ public class AzureStorageDruidModuleTest extends 
EasyMockSupport
   {
     injector = makeInjectorWithProperties(PROPERTIES);
     AzureCloudBlobIterableFactory factory = 
injector.getInstance(AzureCloudBlobIterableFactory.class);
-    AzureCloudBlobIterable object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 
10);
-    AzureCloudBlobIterable object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 
10);
+    AzureCloudBlobIterable object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 
10, azureStorage);
+    AzureCloudBlobIterable object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 
10, azureStorage);
     Assert.assertNotNull(object1);
     Assert.assertNotNull(object2);
     Assert.assertNotSame(object1, object2);
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
index c31dedf2191..65acc9346a6 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -49,6 +49,7 @@ public class AzureStorageTest
   BlobContainerClient blobContainerClient = 
Mockito.mock(BlobContainerClient.class);
   AzureClientFactory azureClientFactory = 
Mockito.mock(AzureClientFactory.class);
 
+  private final String STORAGE_ACCOUNT = "storageAccount";
   private final String CONTAINER = "container";
   private final String BLOB_NAME = "blobName";
   private final Integer MAX_ATTEMPTS = 3;
@@ -56,7 +57,7 @@ public class AzureStorageTest
   @Before
   public void setup() throws BlobStorageException
   {
-    azureStorage = new AzureStorage(azureClientFactory);
+    azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT);
   }
 
   @Test
@@ -71,7 +72,7 @@ public class AzureStorageTest
         ArgumentMatchers.any()
     );
     
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS);
+    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS,
 STORAGE_ACCOUNT);
 
     Assert.assertEquals(ImmutableList.of(BLOB_NAME), 
azureStorage.listDir(CONTAINER, "", MAX_ATTEMPTS));
   }
@@ -88,11 +89,35 @@ public class AzureStorageTest
         ArgumentMatchers.any()
     );
     
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
 STORAGE_ACCOUNT);
 
     Assert.assertEquals(ImmutableList.of(BLOB_NAME), 
azureStorage.listDir(CONTAINER, "", null));
   }
 
+  @Test
+  public void testListBlobsWithPrefixInContainerSegmented() throws 
BlobStorageException
+  {
+    String storageAccountCustom = "customStorageAccount";
+    BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new 
BlobItemProperties().setContentLength(10L));
+    SettableSupplier<PagedResponse<BlobItem>> supplier = new 
SettableSupplier<>();
+    supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
+    PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+    Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3,
 storageAccountCustom);
+
+    azureStorage.listBlobsWithPrefixInContainerSegmented(
+        storageAccountCustom,
+        CONTAINER,
+        "",
+        1,
+        3
+    );
+  }
+
   @Test
   public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
   {
@@ -107,7 +132,7 @@ public class AzureStorageTest
 
     
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
     
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+    
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
 STORAGE_ACCOUNT);
     
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
     Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
         captor.capture(), 
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 3ce9d8fb3b2..92010952802 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -533,7 +533,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
         azureCloudBlobIterableFactory,
         MAX_KEYS,
         PREFIX_URI,
-        ImmutableList.of(object1, object2));
+        ImmutableList.of(object1, object2),
+        azureStorage
+    );
 
     EasyMock.replay(object1, object2);
     AzureTestUtils.expectDeleteObjects(
@@ -564,7 +566,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
           azureCloudBlobIterableFactory,
           MAX_KEYS,
           PREFIX_URI,
-          ImmutableList.of(object1)
+          ImmutableList.of(object1),
+          azureStorage
       );
 
       EasyMock.replay(object1);
@@ -612,7 +615,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
         azureCloudBlobIterableFactory,
         MAX_KEYS,
         PREFIX_URI,
-        ImmutableList.of(object1, object2));
+        ImmutableList.of(object1, object2),
+        azureStorage
+    );
 
     EasyMock.replay(object1, object2);
     AzureTestUtils.expectDeleteObjects(
@@ -642,7 +647,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
           azureCloudBlobIterableFactory,
           MAX_KEYS,
           PREFIX_URI,
-          ImmutableList.of(object1)
+          ImmutableList.of(object1),
+          azureStorage
       );
 
       EasyMock.replay(object1);
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
index 67ec5b8e58d..e6c048dbdd2 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
@@ -57,11 +57,13 @@ public class AzureTestUtils extends EasyMockSupport
       AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
       int maxListingLength,
       URI PREFIX_URI,
-      List<CloudBlobHolder> objects)
+      List<CloudBlobHolder> objects,
+      AzureStorage azureStorage
+  )
   {
     AzureCloudBlobIterable azureCloudBlobIterable = 
EasyMock.createMock(AzureCloudBlobIterable.class);
     
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(objects.iterator());
-    
EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI),
 maxListingLength)).andReturn(azureCloudBlobIterable);
+    
EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI),
 maxListingLength, azureStorage)).andReturn(azureCloudBlobIterable);
     return azureCloudBlobIterable;
   }
 
diff --git a/website/.spelling b/website/.spelling
index 56ad2a69217..e878cd2f9a8 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -2378,6 +2378,9 @@ markUnused
 markUsed
 segmentId
 aggregateMultipleValues
+appRegistrationClientId
+appRegistrationClientSecret
+tenantId
 relativeError
 ddSketch
 DDSketch


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to