This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f102f9031 Introduce StorageConnector for Azure (#14660)
8f102f9031 is described below

commit 8f102f9031aa7956d189775388852ff6e50faa80
Author: Laksh Singla <[email protected]>
AuthorDate: Wed Aug 9 12:25:27 2023 +0000

    Introduce StorageConnector for Azure (#14660)
    
    The Azure connector is introduced and MSQ's fault tolerance and durable 
storage can now be used with Microsoft Azure's blob storage. Also, the results 
of newly introduced queries from deep storage can now store and fetch the 
results from Azure's blob storage.
---
 docs/multi-stage-query/reference.md                |  29 ++-
 extensions-core/azure-extensions/pom.xml           |  13 +-
 .../druid/data/input/azure/AzureInputSource.java   |   2 +-
 .../druid/storage/azure/AzureByteSource.java       |   2 +-
 .../storage/azure/AzureDataSegmentPusher.java      |   2 +-
 .../apache/druid/storage/azure/AzureStorage.java   | 141 +++++++++--
 .../storage/azure/AzureStorageDruidModule.java     |   2 +-
 .../apache/druid/storage/azure/AzureTaskLogs.java  |   8 +-
 .../storage/azure/output/AzureInputRange.java      |  99 ++++++++
 .../storage/azure/output/AzureOutputConfig.java    | 171 ++++++++++++++
 .../azure/output/AzureStorageConnector.java        | 219 +++++++++++++++++
 .../azure/output/AzureStorageConnectorModule.java  |  13 +-
 .../output/AzureStorageConnectorProvider.java      |  59 +++++
 .../org.apache.druid.initialization.DruidModule    |   1 +
 .../druid/storage/azure/AzureByteSourceTest.java   |   6 +-
 .../storage/azure/AzureDataSegmentPullerTest.java  |   8 +-
 .../storage/azure/AzureDataSegmentPusherTest.java  |  12 +-
 .../druid/storage/azure/AzureStorageTest.java      |  71 ++++++
 .../druid/storage/azure/AzureTaskLogsTest.java     |  60 ++---
 .../storage/azure/output/AzureInputRangeTest.java  |  29 +--
 .../azure/output/AzureOutputConfigTest.java        |  85 +++++++
 .../storage/azure/output/AzureOutputSerdeTest.java | 140 +++++++++++
 .../output/AzureStorageConnectorProviderTest.java  | 148 ++++++++++++
 .../azure/output/AzureStorageConnectorTest.java    | 202 ++++++++++++++++
 .../storage/s3/output/S3StorageConnector.java      | 202 ++++------------
 .../druid/storage/StorageConnectorModule.java      |   6 +-
 .../storage/remote/ChunkingStorageConnector.java   | 215 +++++++++++++++++
 .../remote/ChunkingStorageConnectorParameters.java | 259 +++++++++++++++++++++
 .../ChunkingStorageConnectorParametersTest.java    |  66 ++++++
 .../remote/ChunkingStorageConnectorTest.java       |  87 +++++++
 .../druid/storage/remote/TestStorageConnector.java | 135 +++++++++++
 31 files changed, 2231 insertions(+), 261 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 592aed9a2a..e676d77f57 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -349,20 +349,35 @@ SQL-based ingestion supports using durable storage to 
store intermediate files t
 
 ### Durable storage configurations
 
-The following common service properties control how durable storage behaves:
+Durable storage is supported on Amazon S3 storage and Microsoft's Azure 
storage. There are a few common configurations that controls the behavior for 
both the services as documented below. Apart from the common configurations,
+there are a few properties specific to each storage that must be set.
+
+Common properties to configure the behavior of durable storage
 
 |Parameter          |Default                                 | Description     
     |
 
|-------------------|----------------------------------------|----------------------|
-|`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable 
durable storage for the cluster. For more information about enabling durable 
storage, see [Durable storage](../operations/durable-storage.md).|
-|`druid.msq.intermediate.storage.type` | `s3` for Amazon S3 | Required. The 
type of storage to use.  `s3` is the only supported storage type.  |
-|`druid.msq.intermediate.storage.bucket` | n/a | The S3 bucket to store 
intermediate files.  |
-|`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store 
intermediate stage results. Provide a unique value for the prefix. Don't share 
the same prefix between clusters. If the location includes other files or 
directories, then they will get cleaned up as well.  |
-|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on 
the local disk to temporarily store intermediate stage results.  |
+|`druid.msq.intermediate.storage.enable` | false |  Whether to enable durable 
storage for the cluster. Set it to true to enable durable storage. For more 
information about enabling durable storage, see [Durable 
storage](../operations/durable-storage.md).|
+|`druid.msq.intermediate.storage.type` | n/a | Required. The type of storage 
to use. Set it to `s3` for S3 and `azure` for Azure |
+|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on 
the local disk to store temporary files required while uploading and 
downloading the data  |
 |`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max 
number times to attempt S3 API calls to avoid failures due to transient errors. 
| 
 |`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the 
size of each chunk to temporarily store in 
`druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB 
and 5 GiB. A large chunk size reduces the API calls made to the durable 
storage, however it requires more disk space to store the temporary chunks. 
Druid uses a default of 100MiB if the value is not provided.| 
 
+Following properties need to be set in addition to the common properties to 
enable durable storage on S3
+
+|Parameter          |Default                                 | Description     
     |
+|-------------------|----------------------------------------|----------------------|
+|`druid.msq.intermediate.storage.bucket` | n/a | Required. The S3 bucket where 
the files are uploaded to and download from |
+|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to 
all the paths uploaded to the bucket to namespace the connector's files. 
Provide a unique value for the prefix and do not share the same prefix between 
different clusters. If the location includes other files or directories, then 
they might get cleaned up as well.  |
+
+Following properties must be set in addition to the common properties to 
enable durable storage on Azure.  
+
+|Parameter          |Default                                 | Description     
     |
+|-------------------|----------------------------------------|----------------------|
+|`druid.msq.intermediate.storage.container` | n/a | Required. The Azure 
container where the files are uploaded to and downloaded from.  |
+|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to 
all the paths uploaded to the container to namespace the connector's files. 
Provide a unique value for the prefix and do not share the same prefix between 
different clusters. If the location includes other files or directories, then 
they might get cleaned up as well. |
 
-In addition to the common service properties, there are certain properties 
that you configure on the Overlord specifically to clean up intermediate files:
+Durable storage creates files on the remote storage and is cleaned up once the 
job no longer requires those files. However, due to failures causing abrupt 
exit of the tasks, these files might not get cleaned up.
+Therefore, there are certain properties that you configure on the Overlord 
specifically to clean up intermediate files for the tasks that have completed 
and would no longer require these files:
 
 |Parameter          |Default                                 | Description     
     |
 
|-------------------|----------------------------------------|----------------------|
diff --git a/extensions-core/azure-extensions/pom.xml 
b/extensions-core/azure-extensions/pom.xml
index 8d062990a6..ca9aa970c8 100644
--- a/extensions-core/azure-extensions/pom.xml
+++ b/extensions-core/azure-extensions/pom.xml
@@ -92,7 +92,7 @@
         <dependency>
             <groupId>com.google.inject.extensions</groupId>
             <artifactId>guice-assistedinject</artifactId>
-            <version>${guice.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
@@ -152,6 +152,17 @@
             <artifactId>equalsverifier</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index f68ddfa901..6d0e60fe87 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -139,7 +139,7 @@ public class AzureInputSource extends CloudObjectInputSource
       public long getObjectSize(CloudObjectLocation location)
       {
         try {
-          final CloudBlob blobWithAttributes = 
storage.getBlobReferenceWithAttributes(
+          final CloudBlob blobWithAttributes = 
storage.getBlockBlobReferenceWithAttributes(
               location.getBucket(),
               location.getPath()
           );
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
index a8461de0e8..91af1140cb 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
@@ -60,7 +60,7 @@ public class AzureByteSource extends ByteSource
   public InputStream openStream(long offset) throws IOException
   {
     try {
-      return azureStorage.getBlobInputStream(offset, containerName, blobPath);
+      return azureStorage.getBlockBlobInputStream(offset, containerName, 
blobPath);
     }
     catch (StorageException | URISyntaxException e) {
       if (AzureUtils.AZURE_RETRY.apply(e)) {
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 1e46239fd2..9f97256b1d 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -183,7 +183,7 @@ public class AzureDataSegmentPusher implements 
DataSegmentPusher
   )
       throws StorageException, IOException, URISyntaxException
   {
-    azureStorage.uploadBlob(compressedSegmentData, 
segmentConfig.getContainer(), azurePath);
+    azureStorage.uploadBlockBlob(compressedSegmentData, 
segmentConfig.getContainer(), azurePath);
 
     final DataSegment outSegment = segment
         .withSize(size)
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index 4b1539d350..fc1a128e11 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -23,19 +23,26 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.microsoft.azure.storage.ResultContinuation;
 import com.microsoft.azure.storage.ResultSegment;
+import com.microsoft.azure.storage.RetryExponentialRetry;
 import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
 import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.logger.Logger;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -48,6 +55,9 @@ public class AzureStorage
 {
   private static final boolean USE_FLAT_BLOB_LISTING = true;
 
+  // Default value from Azure library
+  private static final int DELTA_BACKOFF_MS = 30_000;
+
   private static final Logger log = new Logger(AzureStorage.class);
 
   /**
@@ -70,14 +80,28 @@ public class AzureStorage
 
   public List<String> emptyCloudBlobDirectory(final String containerName, 
final String virtualDirPath)
       throws StorageException, URISyntaxException
+  {
+    return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
+  }
+
+  public List<String> emptyCloudBlobDirectory(final String containerName, 
final String virtualDirPath, final Integer maxAttempts)
+      throws StorageException, URISyntaxException
   {
     List<String> deletedFiles = new ArrayList<>();
     CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
 
-    for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, 
null, null, null)) {
+    Iterable<ListBlobItem> blobItems = container.listBlobs(
+        virtualDirPath,
+        USE_FLAT_BLOB_LISTING,
+        null,
+        getRequestOptionsWithRetry(maxAttempts),
+        null
+    );
+
+    for (ListBlobItem blobItem : blobItems) {
       CloudBlob cloudBlob = (CloudBlob) blobItem;
-      log.info("Removing file[%s] from Azure.", cloudBlob.getName());
-      if (cloudBlob.deleteIfExists()) {
+      log.debug("Removing file[%s] from Azure.", cloudBlob.getName());
+      if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, 
getRequestOptionsWithRetry(maxAttempts), null)) {
         deletedFiles.add(cloudBlob.getName());
       }
     }
@@ -89,7 +113,7 @@ public class AzureStorage
     return deletedFiles;
   }
 
-  public void uploadBlob(final File file, final String containerName, final 
String blobPath)
+  public void uploadBlockBlob(final File file, final String containerName, 
final String blobPath)
       throws IOException, StorageException, URISyntaxException
   {
     CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
@@ -98,7 +122,29 @@ public class AzureStorage
     }
   }
 
-  public CloudBlob getBlobReferenceWithAttributes(final String containerName, 
final String blobPath)
+  public OutputStream getBlockBlobOutputStream(
+      final String containerName,
+      final String blobPath,
+      @Nullable final Integer streamWriteSizeBytes,
+      Integer maxAttempts
+  ) throws URISyntaxException, StorageException
+  {
+    CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
+    CloudBlockBlob blockBlobReference = 
container.getBlockBlobReference(blobPath);
+
+    if (blockBlobReference.exists()) {
+      throw new RE("Reference already exists");
+    }
+
+    if (streamWriteSizeBytes != null) {
+      blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
+    }
+
+    return blockBlobReference.openOutputStream(null, 
getRequestOptionsWithRetry(maxAttempts), null);
+
+  }
+
+  public CloudBlob getBlockBlobReferenceWithAttributes(final String 
containerName, final String blobPath)
       throws URISyntaxException, StorageException
   {
     final CloudBlockBlob blobReference = 
getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
@@ -106,28 +152,97 @@ public class AzureStorage
     return blobReference;
   }
 
-  public long getBlobLength(final String containerName, final String blobPath)
+  public long getBlockBlobLength(final String containerName, final String 
blobPath)
+      throws URISyntaxException, StorageException
+  {
+    return getBlockBlobReferenceWithAttributes(containerName, 
blobPath).getProperties().getLength();
+  }
+
+  public InputStream getBlockBlobInputStream(final String containerName, final 
String blobPath)
+      throws URISyntaxException, StorageException
+  {
+    return getBlockBlobInputStream(0L, containerName, blobPath);
+  }
+
+  public InputStream getBlockBlobInputStream(long offset, final String 
containerName, final String blobPath)
+      throws URISyntaxException, StorageException
+  {
+    return getBlockBlobInputStream(offset, null, containerName, blobPath);
+  }
+
+  public InputStream getBlockBlobInputStream(long offset, Long length, final 
String containerName, final String blobPath)
       throws URISyntaxException, StorageException
   {
-    return getBlobReferenceWithAttributes(containerName, 
blobPath).getProperties().getLength();
+    return getBlockBlobInputStream(offset, length, containerName, blobPath, 
null);
+  }
+
+  public InputStream getBlockBlobInputStream(long offset, Long length, final 
String containerName, final String blobPath, Integer maxAttempts)
+      throws URISyntaxException, StorageException
+  {
+    CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
+    return container.getBlockBlobReference(blobPath)
+                    .openInputStream(offset, length, null, 
getRequestOptionsWithRetry(maxAttempts), null);
   }
 
-  public InputStream getBlobInputStream(final String containerName, final 
String blobPath)
+  public void batchDeleteFiles(String containerName, Iterable<String> paths, 
Integer maxAttempts)
       throws URISyntaxException, StorageException
   {
-    return getBlobInputStream(0L, containerName, blobPath);
+    CloudBlobContainer cloudBlobContainer = 
getOrCreateCloudBlobContainer(containerName);
+    BlobDeleteBatchOperation blobDeleteBatchOperation = new 
BlobDeleteBatchOperation();
+    for (String path : paths) {
+      CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
+      blobDeleteBatchOperation.addSubOperation(blobReference);
+    }
+    cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, 
getRequestOptionsWithRetry(maxAttempts), null);
   }
 
-  public InputStream getBlobInputStream(long offset, final String 
containerName, final String blobPath)
+  public List<String> listDir(final String containerName, final String 
virtualDirPath)
       throws URISyntaxException, StorageException
   {
+    return listDir(containerName, virtualDirPath, null);
+  }
+
+  public List<String> listDir(final String containerName, final String 
virtualDirPath, final Integer maxAttempts)
+      throws StorageException, URISyntaxException
+  {
+    List<String> files = new ArrayList<>();
     CloudBlobContainer container = 
getOrCreateCloudBlobContainer(containerName);
-    return container.getBlockBlobReference(blobPath).openInputStream(offset, 
null, null, null, null);
+
+    for (ListBlobItem blobItem :
+        container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, 
getRequestOptionsWithRetry(maxAttempts), null)) {
+      CloudBlob cloudBlob = (CloudBlob) blobItem;
+      files.add(cloudBlob.getName());
+    }
+
+    return files;
   }
 
-  public boolean getBlobExists(String container, String blobPath) throws 
URISyntaxException, StorageException
+  public boolean getBlockBlobExists(String container, String blobPath) throws 
URISyntaxException, StorageException
+  {
+    return getBlockBlobExists(container, blobPath, null);
+  }
+
+
+  public boolean getBlockBlobExists(String container, String blobPath, Integer 
maxAttempts)
+      throws URISyntaxException, StorageException
   {
-    return 
getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
+    return 
getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
+                                                   .exists(null, 
getRequestOptionsWithRetry(maxAttempts), null);
+  }
+
+  /**
+   * If maxAttempts is provided, this method returns request options with 
retry built in.
+   * Retry backoff is exponential backoff, with maxAttempts set to the one 
provided
+   */
+  @Nullable
+  private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
+  {
+    if (maxAttempts == null) {
+      return null;
+    }
+    BlobRequestOptions requestOptions = new BlobRequestOptions();
+    requestOptions.setRetryPolicyFactory(new 
RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
+    return requestOptions;
   }
 
   @VisibleForTesting
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index f2f973f11a..674e451de5 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -51,7 +51,7 @@ import java.util.List;
 public class AzureStorageDruidModule implements DruidModule
 {
 
-  static final String SCHEME = "azure";
+  public static final String SCHEME = "azure";
   public static final String
       STORAGE_CONNECTION_STRING_WITH_KEY = 
"DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
   public static final String
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 9bfda5ab34..5e6880c14e 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -95,7 +95,7 @@ public class AzureTaskLogs implements TaskLogs
     try {
       AzureUtils.retryAzureOperation(
           () -> {
-            azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
+            azureStorage.uploadBlockBlob(logFile, config.getContainer(), 
taskKey);
             return null;
           },
           config.getMaxTries()
@@ -129,12 +129,12 @@ public class AzureTaskLogs implements TaskLogs
   {
     final String container = config.getContainer();
     try {
-      if (!azureStorage.getBlobExists(container, taskKey)) {
+      if (!azureStorage.getBlockBlobExists(container, taskKey)) {
         return Optional.absent();
       }
       try {
         final long start;
-        final long length = azureStorage.getBlobLength(container, taskKey);
+        final long length = azureStorage.getBlockBlobLength(container, 
taskKey);
 
         if (offset > 0 && offset < length) {
           start = offset;
@@ -144,7 +144,7 @@ public class AzureTaskLogs implements TaskLogs
           start = 0;
         }
 
-        InputStream stream = azureStorage.getBlobInputStream(container, 
taskKey);
+        InputStream stream = azureStorage.getBlockBlobInputStream(container, 
taskKey);
         stream.skip(start);
 
         return Optional.of(stream);
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java
new file mode 100644
index 0000000000..4803dc8b29
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import java.util.Objects;
+
+/**
+ * Represents a chunk of the Azure blob
+ */
+public class AzureInputRange
+{
+
+  /**
+   * Starting location in the blob stream
+   */
+  private final long start;
+
+  /**
+   * Size of the blob stream that this object represents
+   */
+  private final long size;
+
+  /**
+   * Container where the blob resides
+   */
+  private final String container;
+
+  /**
+   * Absolute path of the blob
+   */
+  private final String path;
+
+  public AzureInputRange(long start, long size, String container, String path)
+  {
+    this.start = start;
+    this.size = size;
+    this.container = container;
+    this.path = path;
+  }
+
+  public long getStart()
+  {
+    return start;
+  }
+
+  public long getSize()
+  {
+    return size;
+  }
+
+  public String getContainer()
+  {
+    return container;
+  }
+
+  public String getPath()
+  {
+    return path;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AzureInputRange that = (AzureInputRange) o;
+    return start == that.start
+           && size == that.size
+           && Objects.equals(container, that.container)
+           && Objects.equals(path, that.path);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(start, size, container, path);
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java
new file mode 100644
index 0000000000..7af9c856c5
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.RetryUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Configuration of the Azure storage connector
+ */
+public class AzureOutputConfig
+{
+  @JsonProperty
+  private final String container;
+
+  @JsonProperty
+  private final String prefix;
+
+  @JsonProperty
+  private final File tempDir;
+
+  @JsonProperty
+  private final HumanReadableBytes chunkSize;
+
+  private static final HumanReadableBytes DEFAULT_CHUNK_SIZE = new 
HumanReadableBytes("4MiB");
+
+  // Minimum limit is self-imposed, so that chunks are appropriately sized, 
and we don't spend a lot of time downloading
+  // the part of the blobs
+  private static final long AZURE_MIN_CHUNK_SIZE_BYTES = new 
HumanReadableBytes("256KiB").getBytes();
+
+  // Maximum limit is imposed by Azure, on the size of one block blob
+  private static final long AZURE_MAX_CHUNK_SIZE_BYTES = new 
HumanReadableBytes("4000MiB").getBytes();
+
+
+  @JsonProperty
+  private final int maxRetry;
+
+  public AzureOutputConfig(
+      @JsonProperty(value = "container", required = true) String container,
+      @JsonProperty(value = "prefix", required = true) String prefix,
+      @JsonProperty(value = "tempDir", required = true) File tempDir,
+      @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes 
chunkSize,
+      @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
+  )
+  {
+    this.container = container;
+    this.prefix = prefix;
+    this.tempDir = tempDir;
+    this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
+    this.maxRetry = maxRetry != null ? maxRetry : RetryUtils.DEFAULT_MAX_TRIES;
+    validateFields();
+  }
+
+
+  public String getContainer()
+  {
+    return container;
+  }
+
+  public String getPrefix()
+  {
+    return prefix;
+  }
+
+  public File getTempDir()
+  {
+    return tempDir;
+  }
+
+  public HumanReadableBytes getChunkSize()
+  {
+    return chunkSize;
+  }
+
+  public int getMaxRetry()
+  {
+    return maxRetry;
+  }
+
+  private void validateFields()
+  {
+    if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || 
chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) {
+      throw InvalidInput.exception(
+          "'chunkSize' [%d] bytes to the AzureConfig should be between [%d] 
bytes and [%d] bytes",
+          chunkSize.getBytes(),
+          AZURE_MIN_CHUNK_SIZE_BYTES,
+          AZURE_MAX_CHUNK_SIZE_BYTES
+      );
+    }
+
+    try {
+      FileUtils.mkdirp(tempDir);
+    }
+    catch (IOException e) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(e, "Unable to create temporary directory 
[%s]", tempDir.getAbsolutePath());
+    }
+
+    if (!tempDir.canRead() || !tempDir.canWrite()) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(
+                              "Cannot read or write on the 'tempDir' [%s]. "
+                              + "Please provide a different path to store the 
intermediate contents of AzureStorageConnector",
+                              tempDir.getAbsolutePath()
+                          );
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AzureOutputConfig that = (AzureOutputConfig) o;
+    return maxRetry == that.maxRetry
+           && Objects.equals(container, that.container)
+           && Objects.equals(prefix, that.prefix)
+           && Objects.equals(tempDir, that.tempDir)
+           && Objects.equals(chunkSize, that.chunkSize);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(container, prefix, tempDir, chunkSize, maxRetry);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AzureOutputConfig{" +
+           "container='" + container + '\'' +
+           ", prefix='" + prefix + '\'' +
+           ", tempDir=" + tempDir +
+           ", chunkSize=" + chunkSize +
+           ", maxRetry=" + maxRetry +
+           '}';
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
new file mode 100644
index 0000000000..657043797e
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.microsoft.azure.storage.StorageException;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureUtils;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of the storage connector that facilitates reading and 
writing from Azure's blob storage.
+ * This extends the {@link ChunkingStorageConnector} so that the downloads are 
in a chunked manner.
+ */
+public class AzureStorageConnector extends 
ChunkingStorageConnector<AzureInputRange>
+{
+
+  private static final String DELIM = "/";
+  private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+
+  private final AzureOutputConfig config;
+  private final AzureStorage azureStorage;
+
+  public AzureStorageConnector(
+      final AzureOutputConfig config,
+      final AzureStorage azureStorage
+  )
+  {
+    this.config = config;
+    this.azureStorage = azureStorage;
+  }
+
+  @Override
+  public ChunkingStorageConnectorParameters<AzureInputRange> 
buildInputParams(String path) throws IOException
+  {
+    try {
+      return buildInputParams(path, 0, 
azureStorage.getBlockBlobLength(config.getContainer(), objectPath(path)));
+    }
+    catch (URISyntaxException | StorageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public ChunkingStorageConnectorParameters<AzureInputRange> 
buildInputParams(String path, long from, long size)
+  {
+    ChunkingStorageConnectorParameters.Builder<AzureInputRange> parameters = 
new ChunkingStorageConnectorParameters.Builder<>();
+    parameters.tempDirSupplier(config::getTempDir);
+    parameters.maxRetry(config.getMaxRetry());
+    parameters.cloudStoragePath(objectPath(path));
+    parameters.retryCondition(AzureUtils.AZURE_RETRY);
+    parameters.start(from);
+    parameters.end(from + size);
+    parameters.objectSupplier((start, end) -> new AzureInputRange(
+        start,
+        end - start,
+        config.getContainer(),
+        objectPath(path)
+    ));
+    parameters.objectOpenFunction(
+        new ObjectOpenFunction<AzureInputRange>()
+        {
+          @Override
+          public InputStream open(AzureInputRange inputRange) throws 
IOException
+          {
+            try {
+              return azureStorage.getBlockBlobInputStream(
+                  inputRange.getStart(),
+                  inputRange.getSize(),
+                  inputRange.getContainer(),
+                  inputRange.getPath(),
+                  config.getMaxRetry()
+              );
+            }
+            catch (URISyntaxException | StorageException e) {
+              throw new IOException(e);
+            }
+          }
+
+          @Override
+          public InputStream open(AzureInputRange inputRange, long offset) 
throws IOException
+          {
+            AzureInputRange newInputRange = new AzureInputRange(
+                inputRange.getStart() + offset,
+                Math.max(inputRange.getSize() - offset, 0),
+                inputRange.getContainer(),
+                inputRange.getPath()
+            );
+            return open(newInputRange);
+          }
+        }
+    );
+
+    return parameters.build();
+  }
+
+  @Override
+  public boolean pathExists(String path) throws IOException
+  {
+    try {
+      return azureStorage.getBlockBlobExists(config.getContainer(), 
objectPath(path), config.getMaxRetry());
+    }
+    catch (URISyntaxException | StorageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public OutputStream write(String path) throws IOException
+  {
+    try {
+      return azureStorage.getBlockBlobOutputStream(
+          config.getContainer(),
+          objectPath(path),
+          config.getChunkSize().getBytesInInt(),
+          config.getMaxRetry()
+      );
+    }
+    catch (URISyntaxException | StorageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void deleteFile(String path) throws IOException
+  {
+    try {
+      azureStorage.batchDeleteFiles(
+          config.getContainer(),
+          Collections.singletonList(objectPath(path)),
+          config.getMaxRetry()
+      );
+    }
+    catch (URISyntaxException | StorageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void deleteFiles(Iterable<String> paths) throws IOException
+  {
+    try {
+      azureStorage.batchDeleteFiles(
+          config.getContainer(),
+          Iterables.transform(paths, this::objectPath),
+          config.getMaxRetry()
+      );
+    }
+    catch (StorageException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void deleteRecursively(String path) throws IOException
+  {
+    try {
+      azureStorage.emptyCloudBlobDirectory(config.getContainer(), 
objectPath(path), config.getMaxRetry());
+    }
+    catch (StorageException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Iterator<String> listDir(String dirName) throws IOException
+  {
+    final String prefixBasePath = objectPath(dirName);
+    List<String> paths;
+    try {
+      paths = azureStorage.listDir(config.getContainer(), prefixBasePath, 
config.getMaxRetry());
+    }
+    catch (StorageException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    return paths.stream().map(path -> {
+      String[] size = path.split(prefixBasePath, 2);
+      if (size.length > 1) {
+        return size[1];
+      } else {
+        return "";
+      }
+    }).iterator();
+  }
+
+  private String objectPath(String path)
+  {
+    return JOINER.join(config.getPrefix(), path);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java
similarity index 75%
copy from 
processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
copy to 
extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java
index 9682ac01d8..b2cdda0eb2 100644
--- 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java
@@ -17,24 +17,25 @@
  * under the License.
  */
 
-package org.apache.druid.storage;
+package org.apache.druid.storage.azure.output;
 
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
 import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.local.LocalFileStorageConnectorProvider;
 
+import java.util.Collections;
 import java.util.List;
 
-public class StorageConnectorModule implements DruidModule
+public class AzureStorageConnectorModule implements DruidModule
 {
   @Override
   public List<? extends Module> getJacksonModules()
   {
-    return ImmutableList.of(new 
SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes(
-        LocalFileStorageConnectorProvider.class));
+    return Collections.singletonList(
+        new SimpleModule(AzureStorageConnectorModule.class.getSimpleName())
+            .registerSubtypes(AzureStorageConnectorProvider.class)
+    );
   }
 
   @Override
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
new file mode 100644
index 0000000000..4264801f4a
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureStorageDruidModule;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName(AzureStorageDruidModule.SCHEME)
+public class AzureStorageConnectorProvider extends AzureOutputConfig 
implements StorageConnectorProvider
+{
+
+  @JacksonInject
+  AzureStorage azureStorage;
+
+  @JsonCreator
+  public AzureStorageConnectorProvider(
+      @JsonProperty(value = "container", required = true) String container,
+      @JsonProperty(value = "prefix", required = true) String prefix,
+      @JsonProperty(value = "tempDir", required = true) File tempDir,
+      @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes 
chunkSize,
+      @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
+  )
+  {
+    super(container, prefix, tempDir, chunkSize, maxRetry);
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    return new AzureStorageConnector(this, azureStorage);
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 298f1d39d1..a801f540d3 100644
--- 
a/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ 
b/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.druid.storage.azure.output.AzureStorageConnectorModule
 org.apache.druid.storage.azure.AzureStorageDruidModule
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
index 649001cc35..f54ef2e403 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
@@ -41,7 +41,7 @@ public class AzureByteSourceTest extends EasyMockSupport
     AzureStorage azureStorage = createMock(AzureStorage.class);
     InputStream stream = createMock(InputStream.class);
 
-    EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, 
blobPath)).andReturn(stream);
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, 
containerName, blobPath)).andReturn(stream);
 
     replayAll();
 
@@ -60,7 +60,7 @@ public class AzureByteSourceTest extends EasyMockSupport
     AzureStorage azureStorage = createMock(AzureStorage.class);
     InputStream stream = createMock(InputStream.class);
 
-    EasyMock.expect(azureStorage.getBlobInputStream(OFFSET, containerName, 
blobPath)).andReturn(stream);
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(OFFSET, 
containerName, blobPath)).andReturn(stream);
 
     replayAll();
 
@@ -78,7 +78,7 @@ public class AzureByteSourceTest extends EasyMockSupport
     final String blobPath = "/path/to/file";
     AzureStorage azureStorage = createMock(AzureStorage.class);
 
-    EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, 
blobPath)).andThrow(
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, 
containerName, blobPath)).andThrow(
         new StorageException(
             "",
             "",
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
index fc98434962..13820072cb 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
@@ -62,7 +62,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       final InputStream zipStream = new FileInputStream(pulledFile);
 
       EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
-      EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
+      EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
 
       replayAll();
 
@@ -94,7 +94,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
       final InputStream zipStream = new FileInputStream(pulledFile);
 
       EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
-      EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
+      EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andReturn(zipStream);
 
       replayAll();
 
@@ -123,7 +123,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
     final File outDir = FileUtils.createTempDir();
     try {
       EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
-      EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
+      EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
           new URISyntaxException(
               "error",
               "error",
@@ -155,7 +155,7 @@ public class AzureDataSegmentPullerTest extends 
EasyMockSupport
     final File outDir = FileUtils.createTempDir();
     try {
       EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, 
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, 
BLOB_PATH));
-      EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
+      EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, 
BLOB_PATH)).andThrow(
           new StorageException(null, null, 0, null, null)
       ).atLeastOnce();
 
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
index f3e65c7923..b18fabbc3d 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
@@ -115,7 +115,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     Files.write(DATA, tmp);
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(EasyMock.anyObject(File.class), 
EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+    azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), 
EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
     EasyMock.expectLastCall();
 
     replayAll();
@@ -145,7 +145,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     Files.write(DATA, tmp);
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(
+    azureStorage.uploadBlockBlob(
         EasyMock.anyObject(File.class),
         EasyMock.eq(CONTAINER_NAME),
         EasyMock.eq(PREFIX + "/" + azurePath)
@@ -178,7 +178,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     Files.write(DATA, tmp);
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(
+    azureStorage.uploadBlockBlob(
         EasyMock.anyObject(File.class),
         EasyMock.eq(CONTAINER_NAME),
         EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX)
@@ -211,7 +211,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     Files.write(DATA, tmp);
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(
+    azureStorage.uploadBlockBlob(
         EasyMock.anyObject(File.class),
         EasyMock.eq(CONTAINER_NAME),
         EasyMock.matches(UNIQUE_MATCHER_PREFIX)
@@ -245,7 +245,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     final long size = DATA.length;
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(EasyMock.anyObject(File.class), 
EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+    azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), 
EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
     EasyMock.expectLastCall().andThrow(new URISyntaxException("", ""));
 
     replayAll();
@@ -284,7 +284,7 @@ public class AzureDataSegmentPusherTest extends 
EasyMockSupport
     final File compressedSegmentData = new File("index.zip");
     final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false);
 
-    azureStorage.uploadBlob(compressedSegmentData, CONTAINER_NAME, azurePath);
+    azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, 
azurePath);
     EasyMock.expectLastCall();
 
     replayAll();
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
new file mode 100644
index 0000000000..9ae0854640
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.google.common.collect.ImmutableList;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+public class AzureStorageTest
+{
+
+  AzureStorage azureStorage;
+  CloudBlobClient cloudBlobClient = Mockito.mock(CloudBlobClient.class);
+  CloudBlobContainer cloudBlobContainer = 
Mockito.mock(CloudBlobContainer.class);
+
+  @Before
+  public void setup() throws URISyntaxException, StorageException
+  {
+    
Mockito.doReturn(cloudBlobContainer).when(cloudBlobClient).getContainerReference(ArgumentMatchers.anyString());
+    azureStorage = new AzureStorage(() -> cloudBlobClient);
+  }
+
+  @Test
+  public void testListDir() throws URISyntaxException, StorageException
+  {
+    List<ListBlobItem> listBlobItems = ImmutableList.of(
+        new CloudBlockBlob(new URI("azure://dummy.com/container/blobName"))
+    );
+
+    Mockito.doReturn(listBlobItems).when(cloudBlobContainer).listBlobs(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.anyBoolean(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+
+    );
+    Assert.assertEquals(ImmutableList.of("blobName"), 
azureStorage.listDir("test", ""));
+
+  }
+}
+
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 297545e4ca..2575793176 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -97,7 +97,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     try {
       final File logFile = new File(tmpDir, "log");
 
-      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/log");
+      azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/log");
       EasyMock.expectLastCall();
 
       replayAll();
@@ -119,7 +119,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     try {
       final File logFile = new File(tmpDir, "log");
 
-      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/log");
+      azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/log");
       EasyMock.expectLastCall().andThrow(new IOException());
 
       replayAll();
@@ -141,7 +141,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     try {
       final File logFile = new File(tmpDir, "log");
 
-      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/report.json");
+      azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/report.json");
       EasyMock.expectLastCall();
 
       replayAll();
@@ -163,7 +163,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     try {
       final File logFile = new File(tmpDir, "status.json");
 
-      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/status.json");
+      azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/status.json");
       EasyMock.expectLastCall();
 
       replayAll();
@@ -185,7 +185,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     try {
       final File logFile = new File(tmpDir, "log");
 
-      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/report.json");
+      azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/report.json");
       EasyMock.expectLastCall().andThrow(new IOException());
 
       replayAll();
@@ -205,9 +205,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/log";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
         new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
 
 
@@ -228,9 +228,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/log";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
         new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
 
 
@@ -251,9 +251,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/log";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
         new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
 
 
@@ -274,9 +274,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
         new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
 
 
@@ -297,7 +297,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/report.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(false);
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(false);
 
     replayAll();
 
@@ -315,9 +315,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andThrow(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) testLog.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andThrow(
         new URISyntaxException("", ""));
 
 
@@ -336,7 +336,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String testLog = "hello this is a log";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andThrow(new URISyntaxException("", ""));
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andThrow(new URISyntaxException("", ""));
 
     replayAll();
 
@@ -351,9 +351,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String taskStatus = "{}";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
         new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8)));
 
 
@@ -372,7 +372,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
   public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws 
Exception
   {
     final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(false);
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(false);
 
     replayAll();
 
@@ -390,9 +390,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
     final String taskStatus = "{}";
 
     final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
-    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
-    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andThrow(
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andThrow(
         new URISyntaxException("", ""));
 
 
@@ -409,7 +409,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
   public void 
test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() 
throws Exception
   {
     final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
-    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andThrow(new URISyntaxException("", ""));
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andThrow(new URISyntaxException("", ""));
 
     replayAll();
 
diff --git 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java
similarity index 53%
copy from 
processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
copy to 
extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java
index 9682ac01d8..4753132d1c 100644
--- 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java
@@ -17,29 +17,18 @@
  * under the License.
  */
 
-package org.apache.druid.storage;
+package org.apache.druid.storage.azure.output;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.local.LocalFileStorageConnectorProvider;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
 
-import java.util.List;
-
-public class StorageConnectorModule implements DruidModule
+public class AzureInputRangeTest
 {
-  @Override
-  public List<? extends Module> getJacksonModules()
+  @Test
+  public void testEquals()
   {
-    return ImmutableList.of(new 
SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes(
-        LocalFileStorageConnectorProvider.class));
-  }
-
-  @Override
-  public void configure(Binder binder)
-  {
-
+    EqualsVerifier.forClass(AzureInputRange.class)
+                  .usingGetClass()
+                  .verify();
   }
 }
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java
new file mode 100644
index 0000000000..ab3104adf4
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.ISE;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AzureOutputConfigTest
+{
+
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String CONTAINER = "container";
+  private static final String PREFIX = "prefix";
+  private static final int MAX_RETRY_COUNT = 0;
+
+  @Test
+  public void testTooLargeChunkSize()
+  {
+    HumanReadableBytes chunkSize = new HumanReadableBytes("4001MiB");
+    Assert.assertThrows(
+        DruidException.class,
+        () -> new AzureOutputConfig(CONTAINER, PREFIX, 
temporaryFolder.newFolder(), chunkSize, MAX_RETRY_COUNT)
+    );
+  }
+
+  @Test
+  public void testTempDirectoryNotWritable() throws IOException
+  {
+    File tempDir = temporaryFolder.newFolder();
+    if (!tempDir.setWritable(false)) {
+      throw new ISE("Unable to change the permission of temp folder for %s", 
this.getClass().getName());
+    }
+    //noinspection ResultOfObjectAllocationIgnored
+    Assert.assertThrows(
+        DruidException.class,
+        () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, 
MAX_RETRY_COUNT)
+    );
+  }
+
+  @Test
+  public void testTempDirectoryNotPresentButWritable() throws IOException
+  {
+    File tempDir = new File(temporaryFolder.newFolder() + 
"/notPresent1/notPresent2/notPresent3");
+    //noinspection ResultOfObjectAllocationIgnored
+    new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT);
+  }
+
+  @Test
+  public void testTempDirectoryPresent() throws IOException
+  {
+    File tempDir = new File(temporaryFolder.newFolder() + 
"/notPresent1/notPresent2/notPresent3");
+    FileUtils.mkdirp(tempDir);
+    //noinspection ResultOfObjectAllocationIgnored
+    new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT);
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java
new file mode 100644
index 0000000000..ecf99666ce
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AzureOutputSerdeTest
+{
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Test
+  public void sanity() throws IOException
+  {
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"container\": \"TEST\",\n"
+                                           + "  \"prefix\": \"abc\",\n"
+                                           + "  \"tempDir\": \"/tmp\",\n"
+                                           + "  \"chunkSize\":104857600,\n"
+                                           + "  \"maxRetry\": 2\n"
+                                           + "}\n");
+
+    AzureOutputConfig azureOutputConfig = new AzureOutputConfig(
+        "TEST",
+        "abc",
+        new File("/tmp"),
+        HumanReadableBytes.valueOf(HumanReadableBytes.parse("100Mib")),
+        2
+    );
+
+    Assert.assertEquals(
+        json,
+        MAPPER.writeValueAsString(azureOutputConfig)
+    );
+
+    Assert.assertEquals(azureOutputConfig, MAPPER.readValue(json, 
AzureOutputConfig.class));
+  }
+
+  @Test
+  public void noPrefix()
+  {
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"container\": \"TEST\",\n"
+                                           + "  \"tempDir\": \"/tmp\",\n"
+                                           + "  \"chunkSize\":104857600,\n"
+                                           + "  \"maxRetry\": 2\n"
+                                           + "}\n");
+    Assert.assertThrows(MismatchedInputException.class, () -> 
MAPPER.readValue(json, AzureOutputConfig.class));
+  }
+
+  @Test
+  public void noContainer()
+  {
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"prefix\": \"abc\",\n"
+                                           + "  \"tempDir\": \"/tmp\",\n"
+                                           + "  \"chunkSize\":104857600,\n"
+                                           + "  \"maxRetry\": 2\n"
+                                           + "}\n");
+    Assert.assertThrows(MismatchedInputException.class, () -> 
MAPPER.readValue(json, AzureOutputConfig.class));
+  }
+
+  @Test
+  public void noTempDir()
+  {
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"prefix\": \"abc\",\n"
+                                           + "  \"container\": \"TEST\",\n"
+                                           + "  \"chunkSize\":104857600,\n"
+                                           + "  \"maxRetry\": 2\n"
+                                           + "}\n");
+    Assert.assertThrows(MismatchedInputException.class, () -> 
MAPPER.readValue(json, AzureOutputConfig.class));
+  }
+
+  @Test
+  public void leastArguments() throws JsonProcessingException
+  {
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"tempDir\": \"/tmp\",\n"
+                                           + "  \"prefix\": \"abc\",\n"
+                                           + "  \"container\": \"TEST\"\n"
+                                           + "}\n");
+
+    AzureOutputConfig azureOutputConfig = new AzureOutputConfig(
+        "TEST",
+        "abc",
+        new File("/tmp"),
+        null,
+        null
+    );
+    Assert.assertEquals(azureOutputConfig, MAPPER.readValue(json, 
AzureOutputConfig.class));
+  }
+
+
+  @Test
+  public void testChunkValidation()
+  {
+
+    String json = jsonStringReadyForAssert("{\n"
+                                           + "  \"prefix\": \"abc\",\n"
+                                           + "  \"container\": \"TEST\",\n"
+                                           + "  \"tempDir\": \"/tmp\",\n"
+                                           + "  \"chunkSize\":104,\n"
+                                           + "  \"maxRetry\": 2\n"
+                                           + "}\n");
+    Assert.assertThrows(ValueInstantiationException.class, () -> 
MAPPER.readValue(json, AzureOutputConfig.class));
+  }
+
+  private static String jsonStringReadyForAssert(String input)
+  {
+    return StringUtils.removeChar(StringUtils.removeChar(input, '\n'), ' ');
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java
new file mode 100644
index 0000000000..50a856c712
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.ProvisionException;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorModule;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureStorageDruidModule;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Properties;
+
+public class AzureStorageConnectorProviderTest
+{
+  private static final String CUSTOM_NAMESPACE = "custom";
+
+  @Test
+  public void createAzureStorageFactoryWithRequiredProperties()
+  {
+
+    final Properties properties = new Properties();
+    properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+    properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+    properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+    properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+    StorageConnectorProvider s3StorageConnectorProvider = 
getStorageConnectorProvider(properties);
+
+    Assert.assertTrue(s3StorageConnectorProvider instanceof 
AzureStorageConnectorProvider);
+    Assert.assertTrue(s3StorageConnectorProvider.get() instanceof 
AzureStorageConnector);
+    Assert.assertEquals("container", ((AzureStorageConnectorProvider) 
s3StorageConnectorProvider).getContainer());
+    Assert.assertEquals("prefix", ((AzureStorageConnectorProvider) 
s3StorageConnectorProvider).getPrefix());
+    Assert.assertEquals(new File("/tmp"), ((AzureStorageConnectorProvider) 
s3StorageConnectorProvider).getTempDir());
+
+  }
+
+  @Test
+  public void createAzureStorageFactoryWithMissingPrefix()
+  {
+
+    final Properties properties = new Properties();
+    properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
+    properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+    properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+    Assert.assertThrows(
+        "Missing required creator property 'prefix'",
+        ProvisionException.class,
+        () -> getStorageConnectorProvider(properties)
+    );
+  }
+
+
+  @Test
+  public void createAzureStorageFactoryWithMissingContainer()
+  {
+
+    final Properties properties = new Properties();
+    properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+    properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+    properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+    Assert.assertThrows(
+        "Missing required creator property 'container'",
+        ProvisionException.class,
+        () -> getStorageConnectorProvider(properties)
+    );
+  }
+
+  @Test
+  public void createAzureStorageFactoryWithMissingTempDir()
+  {
+
+    final Properties properties = new Properties();
+    properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+    properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+    properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+
+    Assert.assertThrows(
+        "Missing required creator property 'tempDir'",
+        ProvisionException.class,
+        () -> getStorageConnectorProvider(properties)
+    );
+  }
+
+  private StorageConnectorProvider getStorageConnectorProvider(Properties 
properties)
+  {
+    StartupInjectorBuilder startupInjectorBuilder = new 
StartupInjectorBuilder().add(
+        new AzureStorageDruidModule(),
+        new StorageConnectorModule(),
+        new AzureStorageConnectorModule(),
+        binder -> {
+          JsonConfigProvider.bind(
+              binder,
+              CUSTOM_NAMESPACE,
+              StorageConnectorProvider.class,
+              Names.named(CUSTOM_NAMESPACE)
+          );
+
+          binder.bind(Key.get(StorageConnector.class, 
Names.named(CUSTOM_NAMESPACE)))
+                .toProvider(Key.get(StorageConnectorProvider.class, 
Names.named(CUSTOM_NAMESPACE)))
+                .in(LazySingleton.class);
+        }
+    ).withProperties(properties);
+
+    Injector injector = startupInjectorBuilder.build();
+    injector.getInstance(ObjectMapper.class).registerModules(new 
AzureStorageConnectorModule().getJacksonModules());
+    injector.getInstance(ObjectMapper.class).setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(
+                AzureStorage.class,
+                EasyMock.mock(AzureStorage.class)
+            ));
+
+
+    return injector.getInstance(Key.get(
+        StorageConnectorProvider.class,
+        Names.named(CUSTOM_NAMESPACE)
+    ));
+  }
+}
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
new file mode 100644
index 0000000000..f8592c32ea
--- /dev/null
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.microsoft.azure.storage.StorageException;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+public class AzureStorageConnectorTest
+{
+
+  private static final String CONTAINER = "CONTAINER";
+  private static final String PREFIX = "P/R/E/F/I/X";
+  public static final String TEST_FILE = "test.csv";
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private StorageConnector storageConnector;
+  private final AzureStorage azureStorage = 
EasyMock.createMock(AzureStorage.class);
+
+  @Before
+  public void setup() throws IOException
+  {
+    storageConnector = new AzureStorageConnector(
+        new AzureOutputConfig(CONTAINER, PREFIX, temporaryFolder.newFolder(), 
null, null),
+        azureStorage
+    );
+  }
+
+
+  @Test
+  public void testPathExistsSuccess() throws URISyntaxException, 
StorageException, IOException
+  {
+    final Capture<String> bucket = Capture.newInstance();
+    final Capture<String> path = Capture.newInstance();
+    EasyMock.reset(azureStorage);
+    EasyMock.expect(azureStorage.getBlockBlobExists(EasyMock.capture(bucket), 
EasyMock.capture(path), EasyMock.anyInt()))
+            .andReturn(true);
+    EasyMock.replay(azureStorage);
+    Assert.assertTrue(storageConnector.pathExists(TEST_FILE));
+    Assert.assertEquals(CONTAINER, bucket.getValue());
+    Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+    EasyMock.verify(azureStorage);
+  }
+
+  @Test
+  public void testPathExistsNotFound() throws URISyntaxException, 
StorageException, IOException
+  {
+    final Capture<String> bucket = Capture.newInstance();
+    final Capture<String> path = Capture.newInstance();
+    EasyMock.reset(azureStorage);
+    EasyMock.expect(azureStorage.getBlockBlobExists(EasyMock.capture(bucket), 
EasyMock.capture(path), EasyMock.anyInt()))
+            .andReturn(false);
+    EasyMock.replay(azureStorage);
+    Assert.assertFalse(storageConnector.pathExists(TEST_FILE));
+    Assert.assertEquals(CONTAINER, bucket.getValue());
+    Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+    EasyMock.verify(azureStorage);
+  }
+
+  @Test
+  public void testRead() throws URISyntaxException, StorageException, 
IOException
+  {
+    EasyMock.reset(azureStorage);
+
+    String data = "test";
+    EasyMock.expect(azureStorage.getBlockBlobLength(EasyMock.anyString(), 
EasyMock.anyString()))
+            .andReturn(4L);
+    EasyMock.expect(
+        azureStorage.getBlockBlobInputStream(
+            EasyMock.anyLong(),
+            EasyMock.anyLong(),
+            EasyMock.anyString(),
+            EasyMock.anyString(),
+            EasyMock.anyInt()
+        )
+    ).andReturn(IOUtils.toInputStream(data, StandardCharsets.UTF_8));
+
+    EasyMock.replay(azureStorage);
+    InputStream is = storageConnector.read(TEST_FILE);
+    byte[] dataBytes = new byte[data.length()];
+    Assert.assertEquals(data.length(), is.read(dataBytes));
+    Assert.assertEquals(-1, is.read());
+    Assert.assertEquals(data, new String(dataBytes, StandardCharsets.UTF_8));
+
+    EasyMock.reset(azureStorage);
+  }
+
+  @Test
+  public void testReadRange() throws URISyntaxException, StorageException, 
IOException
+  {
+    String data = "test";
+
+    for (int start = 0; start < data.length(); ++start) {
+      for (long length = 1; length <= data.length() - start; ++length) {
+        String dataQueried = data.substring(start, start + ((Long) 
length).intValue());
+        EasyMock.reset(azureStorage);
+        EasyMock.expect(azureStorage.getBlockBlobInputStream(
+                    EasyMock.anyLong(),
+                    EasyMock.anyLong(),
+                    EasyMock.anyString(),
+                    EasyMock.anyString(),
+                    EasyMock.anyInt()
+                ))
+                .andReturn(IOUtils.toInputStream(dataQueried, 
StandardCharsets.UTF_8));
+        EasyMock.replay(azureStorage);
+
+        InputStream is = storageConnector.readRange(TEST_FILE, start, length);
+        byte[] dataBytes = new byte[((Long) length).intValue()];
+        Assert.assertEquals(length, is.read(dataBytes));
+        Assert.assertEquals(-1, is.read());
+        Assert.assertEquals(dataQueried, new String(dataBytes, 
StandardCharsets.UTF_8));
+        EasyMock.reset(azureStorage);
+      }
+    }
+  }
+
+  @Test
+  public void testDeleteSinglePath() throws URISyntaxException, 
StorageException, IOException
+  {
+    EasyMock.reset(azureStorage);
+    Capture<String> containerCapture = EasyMock.newCapture();
+    Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
+    azureStorage.batchDeleteFiles(
+        EasyMock.capture(containerCapture),
+        EasyMock.capture(pathsCapture),
+        EasyMock.anyInt()
+    );
+    EasyMock.replay(azureStorage);
+    storageConnector.deleteFile(TEST_FILE);
+    Assert.assertEquals(CONTAINER, containerCapture.getValue());
+    Assert.assertEquals(Collections.singletonList(PREFIX + "/" + TEST_FILE), 
pathsCapture.getValue());
+    EasyMock.reset(azureStorage);
+  }
+
+  @Test
+  public void testDeleteMultiplePaths() throws URISyntaxException, 
StorageException, IOException
+  {
+    EasyMock.reset(azureStorage);
+    Capture<String> containerCapture = EasyMock.newCapture();
+    Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
+    azureStorage.batchDeleteFiles(EasyMock.capture(containerCapture), 
EasyMock.capture(pathsCapture), EasyMock.anyInt());
+    EasyMock.replay(azureStorage);
+    storageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", 
TEST_FILE + "_2.part"));
+    Assert.assertEquals(CONTAINER, containerCapture.getValue());
+    Assert.assertEquals(
+        ImmutableList.of(
+            PREFIX + "/" + TEST_FILE + "_1.part",
+            PREFIX + "/" + TEST_FILE + "_2.part"
+        ),
+        Lists.newArrayList(pathsCapture.getValue())
+    );
+    EasyMock.reset(azureStorage);
+  }
+
+  @Test
+  public void testListDir() throws URISyntaxException, StorageException, 
IOException
+  {
+    EasyMock.reset(azureStorage);
+    EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), 
EasyMock.anyString(), EasyMock.anyInt()))
+            .andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX 
+ "/p/q/r/" + TEST_FILE));
+    EasyMock.replay(azureStorage);
+    List<String> ret = Lists.newArrayList(storageConnector.listDir(""));
+    Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + 
TEST_FILE), ret);
+    EasyMock.reset(azureStorage);
+  }
+}
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 81dfb57476..a68ed9c1c0 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -27,40 +27,29 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-import org.apache.commons.io.input.NullInputStream;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
-import org.apache.druid.data.input.impl.RetryingInputStream;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 
 import javax.annotation.Nonnull;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.SequenceInputStream;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * In this implementation, all remote calls to aws s3 are retried {@link 
S3OutputConfig#getMaxRetry()} times.
  */
-public class S3StorageConnector implements StorageConnector
+public class S3StorageConnector extends 
ChunkingStorageConnector<GetObjectRequest>
 {
   private static final Logger log = new Logger(S3StorageConnector.class);
 
@@ -69,7 +58,6 @@ public class S3StorageConnector implements StorageConnector
 
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
-  private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
   private static final int MAX_NUMBER_OF_LISTINGS = 1000;
 
   public S3StorageConnector(S3OutputConfig config, 
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
@@ -105,169 +93,61 @@ public class S3StorageConnector implements 
StorageConnector
   }
 
   @Override
-  public InputStream read(String path)
+  public ChunkingStorageConnectorParameters<GetObjectRequest> 
buildInputParams(String path)
   {
-    return buildInputStream(new GetObjectRequest(config.getBucket(), 
objectPath(path)), path);
-  }
-
-  @Override
-  public InputStream readRange(String path, long from, long size)
-  {
-    if (from < 0 || size < 0) {
-      throw new IAE(
-          "Invalid arguments for reading %s. from = %d, readSize = %d",
-          objectPath(path),
-          from,
-          size
+    long size;
+    try {
+      size = S3Utils.retryS3Operation(
+          () -> this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength(),
+          config.getMaxRetry()
       );
     }
-    return buildInputStream(
-        new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1),
-        path
-    );
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return buildInputParams(path, 0, size);
   }
 
-  private InputStream buildInputStream(GetObjectRequest getObjectRequest, 
String path)
+  @Override
+  public ChunkingStorageConnectorParameters<GetObjectRequest> 
buildInputParams(String path, long from, long size)
   {
-    // fetch the size of the whole object to make chunks
-    long readEnd;
-    AtomicLong currReadStart = new AtomicLong(0);
-    if (getObjectRequest.getRange() != null) {
-      currReadStart.set(getObjectRequest.getRange()[0]);
-      readEnd = getObjectRequest.getRange()[1] + 1;
-    } else {
-      try {
-        readEnd = S3Utils.retryS3Operation(
-            () -> this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength(),
-            config.getMaxRetry()
-        );
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-    AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
-
-    // build a sequence input stream from chunks
-    return new SequenceInputStream(new Enumeration<InputStream>()
+    ChunkingStorageConnectorParameters.Builder<GetObjectRequest> builder = new 
ChunkingStorageConnectorParameters.Builder<>();
+    builder.start(from);
+    builder.end(from + size);
+    builder.cloudStoragePath(objectPath(path));
+    builder.tempDirSupplier(config::getTempDir);
+    builder.maxRetry(config.getMaxRetry());
+    builder.retryCondition(S3Utils.S3RETRY);
+    builder.objectSupplier((start, end) -> new 
GetObjectRequest(config.getBucket(), objectPath(path)).withRange(start, end - 
1));
+    builder.objectOpenFunction(new ObjectOpenFunction<GetObjectRequest>()
     {
-      boolean initStream = false;
       @Override
-      public boolean hasMoreElements()
+      public InputStream open(GetObjectRequest object)
       {
-        // checking if the stream was already closed. If it was, then don't 
iterate over the remaining chunks
-        // SequenceInputStream's close method closes all the chunk streams in 
its close. Since we're opening them
-        // lazily, we don't need to close them.
-        if (isSequenceStreamClosed.get()) {
-          return false;
-        }
-        // don't stop until the whole object is downloaded
-        return currReadStart.get() < readEnd;
-      }
-
-      @Override
-      public InputStream nextElement()
-      {
-        // since Sequence input stream calls nextElement in the constructor, 
we start chunking as soon as we call read.
-        // to avoid that we pass a nullInputStream for the first iteration.
-        if (!initStream) {
-          initStream = true;
-          return new NullInputStream();
-        }
-        File outFile = new File(config.getTempDir().getAbsolutePath(), 
UUID.randomUUID().toString());
-        // in a single chunk, only download a maximum of 
DOWNLOAD_MAX_CHUNK_SIZE
-        long endPoint = Math.min(currReadStart.get() + 
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
         try {
-          if (!outFile.createNewFile()) {
-            throw new IOE(
-                StringUtils.format(
-                    "Could not create temporary file [%s] for copying [%s]",
-                    outFile.getAbsolutePath(),
-                    objectPath(path)
-                )
-            );
-          }
-          FileUtils.copyLarge(
-              () -> new RetryingInputStream<>(
-                  new GetObjectRequest(
-                      config.getBucket(),
-                      objectPath(path)
-                  ).withRange(currReadStart.get(), endPoint),
-                  new ObjectOpenFunction<GetObjectRequest>()
-                  {
-                    @Override
-                    public InputStream open(GetObjectRequest object)
-                    {
-                      try {
-                        return S3Utils.retryS3Operation(
-                            () -> 
s3Client.getObject(object).getObjectContent(),
-                            config.getMaxRetry()
-                        );
-                      }
-                      catch (Exception e) {
-                        throw new RuntimeException(e);
-                      }
-                    }
-
-                    @Override
-                    public InputStream open(GetObjectRequest object, long 
offset)
-                    {
-                      if (object.getRange() != null) {
-                        long[] oldRange = object.getRange();
-                        object.setRange(oldRange[0] + offset, oldRange[1]);
-                      } else {
-                        object.setRange(offset);
-                      }
-                      return open(object);
-                    }
-                  },
-                  S3Utils.S3RETRY,
-                  config.getMaxRetry()
-              ),
-              outFile,
-              new byte[8 * 1024],
-              Predicates.alwaysFalse(),
-              1,
-              StringUtils.format("Retrying copying of [%s] to [%s]", 
objectPath(path), outFile.getAbsolutePath())
+          return S3Utils.retryS3Operation(
+              () -> s3Client.getObject(object).getObjectContent(),
+              config.getMaxRetry()
           );
         }
-        catch (IOException e) {
-          throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", 
objectPath(path), outFile));
-        }
-        try {
-          AtomicBoolean isClosed = new AtomicBoolean(false);
-          return new FileInputStream(outFile)
-          {
-            @Override
-            public void close() throws IOException
-            {
-              // close should be idempotent
-              if (isClosed.get()) {
-                return;
-              }
-              isClosed.set(true);
-              super.close();
-              // since endPoint is inclusive in s3's get request API, the next 
currReadStart is endpoint + 1
-              currReadStart.set(endPoint + 1);
-              if (!outFile.delete()) {
-                throw new RE("Cannot delete temp file [%s]", outFile);
-              }
-            }
-          };
-        }
-        catch (FileNotFoundException e) {
-          throw new RE(e, StringUtils.format("Unable to find temp file [%s]", 
outFile));
+        catch (Exception e) {
+          throw new RuntimeException(e);
         }
       }
-    })
-    {
+
       @Override
-      public void close() throws IOException
+      public InputStream open(GetObjectRequest object, long offset)
       {
-        isSequenceStreamClosed.set(true);
-        super.close();
+        if (object.getRange() != null) {
+          long[] oldRange = object.getRange();
+          object.setRange(oldRange[0] + offset, oldRange[1]);
+        } else {
+          object.setRange(offset);
+        }
+        return open(object);
       }
-    };
+    });
+    return builder.build();
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java 
b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
index 9682ac01d8..a0bf3a91f0 100644
--- 
a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
+++ 
b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
@@ -33,8 +33,10 @@ public class StorageConnectorModule implements DruidModule
   @Override
   public List<? extends Module> getJacksonModules()
   {
-    return ImmutableList.of(new 
SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes(
-        LocalFileStorageConnectorProvider.class));
+    return ImmutableList.of(
+        new SimpleModule(StorageConnector.class.getSimpleName())
+            .registerSubtypes(LocalFileStorageConnectorProvider.class)
+    );
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
 
b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
new file mode 100644
index 0000000000..5d181b7248
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An abstract implementation of the storage connectors that download the file 
from the remote storage in chunks
+ * and presents the downloaded chunks as a single {@link InputStream} for the 
consumers of the connector.
+ * This implementation benefits over keeping the InputStream to the remote 
source open since we don't require the
+ * connection to be open for the entire duration.
+ * Checkout {@link ChunkingStorageConnectorParameters} to see the inputs 
required to support chunking
+ */
+public abstract class ChunkingStorageConnector<T> implements StorageConnector
+{
+  /**
+   * Default size for chunking of the storage connector. Set to 100MBs to keep 
the chunk size small relative to the
+   * total frame size, while also preventing a large number of calls to the 
remote storage. While fetching a single
+   * file, 100MBs would be required in the disk space.
+   */
+  private static final long DOWNLOAD_MAX_CHUNK_SIZE_BYTES = 100_000_000;
+
+  /**
+   * Default fetch buffer size while copying from the remote location to the 
download file. Set to default sizing given
+   * in the {@link org.apache.commons.io.IOUtils}
+   */
+  private static final int FETCH_BUFFER_SIZE_BYTES = 8 * 1024;
+
+  private final long chunkSizeBytes;
+
+  public ChunkingStorageConnector()
+  {
+    this(DOWNLOAD_MAX_CHUNK_SIZE_BYTES);
+  }
+
+  public ChunkingStorageConnector(
+      final long chunkSizeBytes
+  )
+  {
+    this.chunkSizeBytes = chunkSizeBytes;
+  }
+
+  @Override
+  public InputStream read(String path) throws IOException
+  {
+    return buildInputStream(buildInputParams(path));
+  }
+
+  @Override
+  public InputStream readRange(String path, long from, long size)
+  {
+    return buildInputStream(buildInputParams(path, from, size));
+  }
+
+  public abstract ChunkingStorageConnectorParameters<T> 
buildInputParams(String path) throws IOException;
+
+  public abstract ChunkingStorageConnectorParameters<T> 
buildInputParams(String path, long from, long size);
+
+  private InputStream buildInputStream(ChunkingStorageConnectorParameters<T> 
params)
+  {
+    // Position from where the read needs to be resumed
+    final AtomicLong currentReadStartPosition = new 
AtomicLong(params.getStart());
+
+    // Final position, exclusive
+    long readEnd = params.getEnd();
+
+    AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
+
+    return new SequenceInputStream(
+
+        new Enumeration<InputStream>()
+        {
+          boolean initStream = false;
+
+          @Override
+          public boolean hasMoreElements()
+          {
+            // Checking if the stream was already closed. If it was, then 
don't iterate over the remaining chunks
+            // SequenceInputStream's close method closes all the chunk streams 
in its close. Since we're opening them
+            // lazily, we don't need to close them.
+            if (isSequenceStreamClosed.get()) {
+              return false;
+            }
+            // Don't stop until the whole object is downloaded
+            return currentReadStartPosition.get() < readEnd;
+          }
+
+          @Override
+          public InputStream nextElement()
+          {
+            if (!initStream) {
+              initStream = true;
+              return new NullInputStream();
+            }
+
+            File outFile = new File(
+                params.getTempDirSupplier().get().getAbsolutePath(),
+                UUID.randomUUID().toString()
+            );
+
+            long currentReadEndPosition = Math.min(
+                currentReadStartPosition.get() + chunkSizeBytes,
+                readEnd
+            );
+
+            try {
+              if (!outFile.createNewFile()) {
+                throw new IOE(
+                    StringUtils.format(
+                        "Could not create temporary file [%s] for copying 
[%s]",
+                        outFile.getAbsolutePath(),
+                        params.getCloudStoragePath()
+                    )
+                );
+              }
+
+              FileUtils.copyLarge(
+                  () -> new RetryingInputStream<>(
+                      
params.getObjectSupplier().getObject(currentReadStartPosition.get(), 
currentReadEndPosition),
+                      params.getObjectOpenFunction(),
+                      params.getRetryCondition(),
+                      params.getMaxRetry()
+                  ),
+                  outFile,
+                  new byte[FETCH_BUFFER_SIZE_BYTES],
+                  Predicates.alwaysFalse(),
+                  1,
+                  StringUtils.format(
+                      "Retrying copying of [%s] to [%s]",
+                      params.getCloudStoragePath(),
+                      outFile.getAbsolutePath()
+                  )
+              );
+            }
+            catch (IOException e) {
+              throw new RE(e, StringUtils.format("Unable to copy [%s] to 
[%s]", params.getCloudStoragePath(), outFile));
+            }
+
+            try {
+              AtomicBoolean fileInputStreamClosed = new AtomicBoolean(false);
+              return new FileInputStream(outFile)
+              {
+                @Override
+                public void close() throws IOException
+                {
+                  // close should be idempotent
+                  if (fileInputStreamClosed.get()) {
+                    return;
+                  }
+                  fileInputStreamClosed.set(true);
+                  super.close();
+                  currentReadStartPosition.set(currentReadEndPosition);
+                  if (!outFile.delete()) {
+                    throw new RE("Cannot delete temp file [%s]", outFile);
+                  }
+                }
+
+              };
+            }
+            catch (FileNotFoundException e) {
+              throw new RE(e, StringUtils.format("Unable to find temp file 
[%s]", outFile));
+            }
+          }
+        }
+    )
+    {
+      @Override
+      public void close() throws IOException
+      {
+        isSequenceStreamClosed.set(true);
+        super.close();
+      }
+    };
+  }
+
+  public interface GetObjectFromRangeFunction<T>
+  {
+    T getObject(long start, long end);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
 
b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
new file mode 100644
index 0000000000..03f5ecad1b
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * POJO for storing the parameters required to support chunking of the 
downloads by {@link ChunkingStorageConnector}.
+ * The implementations of the {@link ChunkingStorageConnector} should 
essentially provide a way to build this object,
+ * which contains the information required to support chunking.
+ * Therefore, to a call of {@link 
org.apache.druid.storage.StorageConnector#readRange(String, long, long)}, the
+ * implementations of the chunking storage connectors would fetch the required 
chunks using the information present in
+ * this POJO.
+ */
+public class ChunkingStorageConnectorParameters<T>
+{
+  /**
+   * Starting point from where to begin reading the cloud object. This is 
inclusive.
+   */
+  private final long start;
+
+  /**
+   * Ending point till where to end reading the cloud object. This is 
exclusive.
+   */
+  private final long end;
+
+  /**
+   * Absolute storage path of the cloud object.
+   */
+  private final String cloudStoragePath;
+
+  /**
+   * Given a range (start inclusive, end exclusive), fetch the object which 
represents the provided range of the remote
+   * object
+   */
+  private final ChunkingStorageConnector.GetObjectFromRangeFunction<T> 
objectSupplier;
+
+  /**
+   * Fetching function, which opens the input stream to the range provided by 
the given object
+   */
+  private final ObjectOpenFunction<T> objectOpenFunction;
+
+  /**
+   * Condition to initiate a retry if downloading the chunk errors out
+   */
+  private final Predicate<Throwable> retryCondition;
+
+  /**
+   * Max number of retries while reading the storage connector
+   */
+  private final int maxRetry;
+
+  /**
+   * Temporary directory where the chunks are stored
+   */
+  private final Supplier<File> tempDirSupplier;
+
+  public ChunkingStorageConnectorParameters(
+      long start,
+      long end,
+      String cloudStoragePath,
+      ChunkingStorageConnector.GetObjectFromRangeFunction<T> objectSupplier,
+      ObjectOpenFunction<T> objectOpenFunction,
+      Predicate<Throwable> retryCondition,
+      int maxRetry,
+      Supplier<File> tempDirSupplier
+  )
+  {
+    this.start = start;
+    this.end = end;
+    this.cloudStoragePath = cloudStoragePath;
+    this.objectSupplier = objectSupplier;
+    this.objectOpenFunction = objectOpenFunction;
+    this.retryCondition = retryCondition;
+    this.maxRetry = maxRetry;
+    this.tempDirSupplier = tempDirSupplier;
+  }
+
+  public long getStart()
+  {
+    return start;
+  }
+
+  public long getEnd()
+  {
+    return end;
+  }
+
+  public String getCloudStoragePath()
+  {
+    return cloudStoragePath;
+  }
+
+  public ChunkingStorageConnector.GetObjectFromRangeFunction<T> 
getObjectSupplier()
+  {
+    return objectSupplier;
+  }
+
+  public ObjectOpenFunction<T> getObjectOpenFunction()
+  {
+    return objectOpenFunction;
+  }
+
+  public Predicate<Throwable> getRetryCondition()
+  {
+    return retryCondition;
+  }
+
+  public int getMaxRetry()
+  {
+    return maxRetry;
+  }
+
+  public Supplier<File> getTempDirSupplier()
+  {
+    return tempDirSupplier;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ChunkingStorageConnectorParameters<?> that = 
(ChunkingStorageConnectorParameters<?>) o;
+    return start == that.start &&
+           end == that.end &&
+           maxRetry == that.maxRetry &&
+           Objects.equals(cloudStoragePath, that.cloudStoragePath) &&
+           Objects.equals(objectSupplier, that.objectSupplier) &&
+           Objects.equals(objectOpenFunction, that.objectOpenFunction) &&
+           Objects.equals(retryCondition, that.retryCondition) &&
+           Objects.equals(tempDirSupplier, that.tempDirSupplier);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        start,
+        end,
+        cloudStoragePath,
+        objectSupplier,
+        objectOpenFunction,
+        retryCondition,
+        maxRetry,
+        tempDirSupplier
+    );
+  }
+
+  /**
+   * Builder for {@link ChunkingStorageConnectorParameters}. Performs null 
checks and asserts preconditions before
+   * building the instance
+   */
+  public static class Builder<T>
+  {
+    private long start;
+    private long end;
+    private String cloudStoragePath;
+    private ChunkingStorageConnector.GetObjectFromRangeFunction<T> 
objectSupplier;
+    private ObjectOpenFunction<T> objectOpenFunction;
+    private Predicate<Throwable> retryCondition;
+    private int maxRetry;
+    private Supplier<File> tempDirSupplier;
+
+
+    public Builder<T> start(long start)
+    {
+      this.start = start;
+      return this;
+    }
+
+    public Builder<T> end(long end)
+    {
+      this.end = end;
+      return this;
+    }
+
+    public Builder<T> cloudStoragePath(String cloudStoragePath)
+    {
+      this.cloudStoragePath = cloudStoragePath;
+      return this;
+    }
+
+    public Builder<T> 
objectSupplier(ChunkingStorageConnector.GetObjectFromRangeFunction<T> 
objectSupplier)
+    {
+      this.objectSupplier = objectSupplier;
+      return this;
+    }
+
+    public Builder<T> objectOpenFunction(ObjectOpenFunction<T> 
objectOpenFunction)
+    {
+      this.objectOpenFunction = objectOpenFunction;
+      return this;
+    }
+
+    public Builder<T> retryCondition(Predicate<Throwable> retryCondition)
+    {
+      this.retryCondition = retryCondition;
+      return this;
+    }
+
+    public Builder<T> maxRetry(int maxRetry)
+    {
+      this.maxRetry = maxRetry;
+      return this;
+    }
+
+    public Builder<T> tempDirSupplier(Supplier<File> tempDirSupplier)
+    {
+      this.tempDirSupplier = tempDirSupplier;
+      return this;
+    }
+
+    public ChunkingStorageConnectorParameters<T> build()
+    {
+      Preconditions.checkArgument(start >= 0, "'start' not provided or an 
incorrect value [%s] passed", start);
+      Preconditions.checkArgument(end >= 0, "'end' not provided or an 
incorrect value [%s] passed", end);
+      Preconditions.checkArgument(start <= end, "'start' should not be greater 
than 'end'");
+      Preconditions.checkArgument(maxRetry >= 0, "'maxRetry' not provided or 
an incorrect value [%s] passed", maxRetry);
+      return new ChunkingStorageConnectorParameters(
+          start,
+          end,
+          Preconditions.checkNotNull(cloudStoragePath, "'cloudStoragePath' not 
supplied"),
+          Preconditions.checkNotNull(objectSupplier, "'objectSupplier' not 
supplied"),
+          Preconditions.checkNotNull(objectOpenFunction, "'objectOpenFunction' 
not supplied"),
+          Preconditions.checkNotNull(retryCondition, "'retryCondition' not 
supplied"),
+          maxRetry,
+          Preconditions.checkNotNull(tempDirSupplier, "'tempDirSupplier' not 
supplied")
+      );
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java
 
b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java
new file mode 100644
index 0000000000..d8b879da43
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class ChunkingStorageConnectorParametersTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(ChunkingStorageConnectorParameters.class)
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void testIncorrectParameters()
+  {
+    ChunkingStorageConnectorParameters.Builder<Void> builder = new 
ChunkingStorageConnectorParameters.Builder<>();
+    builder.start(-1);
+    Assert.assertThrows(IllegalArgumentException.class, builder::build);
+  }
+
+  @Test
+  public void testCorrectParameters()
+  {
+    ChunkingStorageConnectorParameters.Builder<Void> builder = new 
ChunkingStorageConnectorParameters.Builder<>();
+    builder.start(0);
+    builder.end(10);
+    builder.objectSupplier((start, end) -> null);
+    builder.objectOpenFunction(obj -> null);
+    builder.maxRetry(10);
+    builder.cloudStoragePath("/path");
+    builder.retryCondition(Predicates.alwaysTrue());
+    builder.tempDirSupplier(() -> new File("/tmp"));
+    ChunkingStorageConnectorParameters<Void> parameters = builder.build();
+    Assert.assertEquals(0, parameters.getStart());
+    Assert.assertEquals(10, parameters.getEnd());
+    Assert.assertEquals(10, parameters.getMaxRetry());
+    Assert.assertEquals("/path", parameters.getCloudStoragePath());
+    Assert.assertEquals("/tmp", 
parameters.getTempDirSupplier().get().getAbsolutePath());
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java
 
b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java
new file mode 100644
index 0000000000..ccadab3a88
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.storage.StorageConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class ChunkingStorageConnectorTest
+{
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private StorageConnector storageConnector;
+
+  @Before
+  public void setup() throws IOException
+  {
+    storageConnector = new TestStorageConnector(temporaryFolder.newFolder());
+  }
+
+  @Test
+  public void testRead() throws IOException
+  {
+    InputStream is = storageConnector.read("");
+    byte[] dataBytes = IOUtils.toByteArray(is);
+    Assert.assertEquals(TestStorageConnector.DATA, new String(dataBytes, 
StandardCharsets.UTF_8));
+  }
+
+  @Test
+  public void testReadRange() throws IOException
+  {
+
+    List<Integer> ranges = ImmutableList.of(
+        TestStorageConnector.CHUNK_SIZE_BYTES,
+        TestStorageConnector.CHUNK_SIZE_BYTES * 2,
+        TestStorageConnector.CHUNK_SIZE_BYTES * 7,
+        TestStorageConnector.CHUNK_SIZE_BYTES + 1,
+        TestStorageConnector.CHUNK_SIZE_BYTES + 2,
+        TestStorageConnector.CHUNK_SIZE_BYTES + 3
+    );
+
+    List<Integer> startPositions = ImmutableList.of(0, 25, 37, 
TestStorageConnector.DATA.length() - 10);
+
+    for (int range : ranges) {
+      for (int startPosition : startPositions) {
+        int limitedRange = startPosition + range > 
TestStorageConnector.DATA.length()
+                           ? TestStorageConnector.DATA.length() - startPosition
+                           : range;
+        InputStream is = storageConnector.readRange("", startPosition, 
limitedRange);
+        byte[] dataBytes = IOUtils.toByteArray(is);
+        Assert.assertEquals(
+            TestStorageConnector.DATA.substring(startPosition, startPosition + 
limitedRange),
+            new String(dataBytes, StandardCharsets.UTF_8)
+        );
+      }
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java
 
b/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java
new file mode 100644
index 0000000000..e5757a9492
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+public class TestStorageConnector extends 
ChunkingStorageConnector<TestStorageConnector.InputRange>
+{
+
+  public static final String DATA = "This is some random data text. This 
should be returned in chunks by the methods, "
+                                     + "however the connector should 
reassemble it as a single stream of text";
+
+  public static final int CHUNK_SIZE_BYTES = 4;
+
+  private final File tempDir;
+
+  public TestStorageConnector(
+      final File tempDir
+  )
+  {
+    super(CHUNK_SIZE_BYTES);
+    this.tempDir = tempDir;
+  }
+
+  @Override
+  public ChunkingStorageConnectorParameters<TestStorageConnector.InputRange> 
buildInputParams(String path)
+  {
+    return buildInputParams(path, 0, DATA.length());
+  }
+
+  @Override
+  public ChunkingStorageConnectorParameters<TestStorageConnector.InputRange> 
buildInputParams(
+      String path,
+      long from,
+      long size
+  )
+  {
+    ChunkingStorageConnectorParameters.Builder<InputRange> builder = new 
ChunkingStorageConnectorParameters.Builder<>();
+    builder.start(from);
+    builder.end(from + size);
+    builder.cloudStoragePath(path);
+    builder.tempDirSupplier(() -> tempDir);
+    builder.retryCondition(Predicates.alwaysFalse());
+    builder.maxRetry(2);
+    builder.objectSupplier((start, end) -> new InputRange((int) start, (int) 
end));
+    builder.objectOpenFunction(new ObjectOpenFunction<InputRange>()
+    {
+      @Override
+      public InputStream open(InputRange ir)
+      {
+        return IOUtils.toInputStream(DATA.substring(ir.start, ir.end), 
StandardCharsets.UTF_8);
+      }
+
+      @Override
+      public InputStream open(InputRange ir, long offset)
+      {
+        return open(new InputRange(ir.start + (int) offset, ir.end));
+      }
+    });
+    return builder.build();
+  }
+
+  @Override
+  public boolean pathExists(String path)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OutputStream write(String path)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteFile(String path)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteFiles(Iterable<String> paths)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteRecursively(String path)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<String> listDir(String dirName)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class InputRange
+  {
+    private final int start;
+    private final int end;
+
+    public InputRange(int start, int end)
+    {
+      this.start = start;
+      this.end = end;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to