This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3e512249e37 Azure multi read options (#15630)
3e512249e37 is described below
commit 3e512249e37407c0ece2b241d4b16e448eb18f0a
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Jan 25 13:29:16 2024 -0500
Azure multi read options (#15630)
* Include new dependencies
* Mostly implemented
* More azure fixes
* Tests passing
* Unit tests running
* Test running after removing storage exception
* Happy with coverage now
* Add more tests
* fix client factory
* cleanup from testing
* Remove old client
* update docs
* Exclude from spellcheck
* Add licenses
* Fix identity version
* Save work
* Add azure clients
* add licenses
* typos
* Add dependencies
* Exception is not thrown
* Fix intellij check
* Don't need to override
* specify length
* urldecode
* encode path
* Fix checks
* Revert urlencode changes
* Urlencode with azure library
* Update docs/development/extensions-core/azure.md
Co-authored-by: Abhishek Agarwal
<[email protected]>
* PR changes
* Update docs/development/extensions-core/azure.md
Co-authored-by: 317brian <[email protected]>
* Add config for multiple storage accounts
* Deprecate AzureTaskLogsConfig.maxRetries
* Clean up azure retry block
* logic update to reuse clients
* fix comments
* Create container conditionally
* Fix key auth
* save work
* Fix unit tests
* Revert old azure input type
* Separate input source
* save work
* Add support for app registrations
* Fix unit tests
* clean up spacing
* Add coverage
* fixes from testing
* cleanup some caching behavior
* Add docs
* Fix spelling issues
* fix more spelling errors'
* Fix intellij inspections
* add simple changes from pr
* save work on fixing bug
* Fix unit tests
* Add more testing
* Fix unit test
* Add tests
* Add annotation for azureStorage
* Fix up docs
* Add comment for list method
* Fix tests
* Remove uneeded toString
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* Update docs/ingestion/input-sources.md
Co-authored-by: 317brian <[email protected]>
* PR changes
* fix injection of StorageConnector
* Fix checkstyle
* clean up unit tests
* More pr fixes
---------
Co-authored-by: Abhishek Agarwal
<[email protected]>
Co-authored-by: 317brian <[email protected]>
---
docs/ingestion/input-sources.md | 110 +++++++++++-
.../apache/druid/data/input/azure/AzureEntity.java | 17 +-
.../druid/data/input/azure/AzureEntityFactory.java | 7 +-
.../druid/data/input/azure/AzureInputSource.java | 7 +-
...ce.java => AzureStorageAccountInputSource.java} | 101 +++++++----
.../AzureStorageAccountInputSourceConfig.java | 138 +++++++++++++++
.../druid/storage/azure/AzureByteSource.java | 2 +-
.../storage/azure/AzureByteSourceFactory.java | 6 +-
.../druid/storage/azure/AzureClientFactory.java | 41 +++--
.../storage/azure/AzureCloudBlobIterable.java | 7 +-
.../azure/AzureCloudBlobIterableFactory.java | 2 +-
.../storage/azure/AzureCloudBlobIterator.java | 28 ++-
.../azure/AzureCloudBlobIteratorFactory.java | 2 +-
.../storage/azure/AzureDataSegmentKiller.java | 3 +-
.../storage/azure/AzureDataSegmentPuller.java | 6 +-
.../storage/azure/AzureDataSegmentPusher.java | 3 +-
.../storage/azure/AzureIngestClientFactory.java | 78 +++++++++
.../apache/druid/storage/azure/AzureStorage.java | 23 ++-
.../storage/azure/AzureStorageDruidModule.java | 15 +-
.../apache/druid/storage/azure/AzureTaskLogs.java | 3 +-
.../org/apache/druid/storage/azure/AzureUtils.java | 2 +-
.../druid/storage/azure/blob/CloudBlobHolder.java | 9 +-
.../output/AzureStorageConnectorProvider.java | 2 +
.../druid/data/input/azure/AzureEntityTest.java | 70 ++++++--
.../input/azure/AzureInputSourceSerdeTest.java | 6 +-
.../data/input/azure/AzureInputSourceTest.java | 15 +-
...ava => AzureStorageAccountInputSourceTest.java} | 164 +++++++++++-------
.../storage/azure/AzureClientFactoryTest.java | 25 +--
.../storage/azure/AzureCloudBlobIteratorTest.java | 142 ++++++++++++++-
.../storage/azure/AzureDataSegmentKillerTest.java | 7 +-
.../storage/azure/AzureDataSegmentPullerTest.java | 16 +-
.../azure/AzureIngestClientFactoryTest.java | 190 +++++++++++++++++++++
.../storage/azure/AzureStorageDruidModuleTest.java | 27 +--
.../druid/storage/azure/AzureStorageTest.java | 33 +++-
.../druid/storage/azure/AzureTaskLogsTest.java | 14 +-
.../apache/druid/storage/azure/AzureTestUtils.java | 6 +-
website/.spelling | 3 +
37 files changed, 1112 insertions(+), 218 deletions(-)
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index 3e11734ee55..8e70ed8b0ef 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -309,6 +309,112 @@ Google Cloud Storage object:
The Azure input source reads objects directly from Azure Blob store or Azure
Data Lake sources. You can
specify objects as a list of file URI strings or prefixes. You can split the
Azure input source for use with [Parallel task](./native-batch.md) indexing and
each worker task reads one chunk of the split data.
+
+:::info
+The old `azure` schema is deprecated. Update your specs to use the
`azureStorage` schema described below instead.
+:::
+
+Sample specs:
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "azureStorage",
+ "objectGlob": "**.json",
+ "uris": ["azureStorage://storageAccount/container/prefix1/file.json",
"azureStorage://storageAccount/container/prefix2/file2.json"]
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "azureStorage",
+ "objectGlob": "**.parquet",
+ "prefixes": ["azureStorage://storageAccount/container/prefix1/",
"azureStorage://storageAccount/container/prefix2/"]
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "azureStorage",
+ "objectGlob": "**.json",
+ "objects": [
+ { "bucket": "storageAccount", "path":
"container/prefix1/file1.json"},
+ { "bucket": "storageAccount", "path": "container/prefix2/file2.json"}
+ ],
+ "properties": {
+ "sharedAccessStorageToken": "?sv=...<storage token secret>...",
+ }
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|type|Set the value to `azureStorage`.|None|yes|
+|uris|JSON array of URIs where the Azure objects to be ingested are located.
Use this format:
`azureStorage://STORAGE_ACCOUNT/CONTAINER/PATH_TO_FILE`|None|One of the
following must be set:`uris`, `prefixes`, or `objects`.|
+|prefixes|JSON array of URI prefixes for the locations of Azure objects to
ingest. Use this format`azureStorage://STORAGE_ACCOUNT/CONTAINER/PREFIX`. Empty
objects starting with any of the given prefixes are skipped.|None|One of the
following must be set:`uris`, `prefixes`, or `objects`.|
+|objects|JSON array of Azure objects to ingest.|None|One of the following must
be set:`uris`, `prefixes`, or `objects`.|
+|objectGlob|A glob for the object part of the Azure URI. In the URI
`azureStorage://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br
/><br />The glob must match the entire object part, not just the filename. For
example, the glob `*.json` does not match `azureStorage://foo/bar/file.json`
because the object part is `bar/file.json`, and the`*` does not match the
slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br
/>For more information, refer to the [...]
+|systemFields|JSON array of system fields to return as part of input rows.
Possible values: `__file_uri` (Azure blob URI starting with `azureStorage://`),
`__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|
+|properties|Properties object for overriding the default Azure configuration.
See below for more information.|None|No (defaults will be used if not given)
+
+Note that the Azure input source skips all empty objects only when `prefixes`
is specified.
+
+The `objects` property can one of the following:
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|bucket|Name of the Azure Blob Storage or Azure Data Lake storage
account|None|yes|
+|path|The container and path where data is located.|None|yes|
+
+
+The `properties` property can be one of the following:
+
+- `sharedAccessStorageToken`
+- `key`
+- `appRegistrationClientId`, `appRegistrationClientSecret`, and `tenantId`
+- empty
+
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|---------|
+|sharedAccessStorageToken|The plain text string of this Azure Blob Storage
Shared Access Token|None|No|
+|key|The root key of Azure Blob Storage Account|None|no|
+|appRegistrationClientId|The client ID of the Azure App registration to
authenticate as|None|No|
+|appRegistrationClientSecret|The client secret of the Azure App registration
to authenticate as|None|Yes if `appRegistrationClientId` is provided|
+|tenantId|The tenant ID of the Azure App registration to authenticate
as|None|Yes if `appRegistrationClientId` is provided|
+
+<details closed>
+ <summary>Show the deprecated 'azure' input source</summary>
+
+Note that the deprecated `azure` input source doesn't support specifying which
storage account to ingest from. We recommend using the `azureStorage` instead.
+
Sample specs:
```json
@@ -372,7 +478,7 @@ Sample specs:
|uris|JSON array of URIs where the Azure objects to be ingested are located,
in the form `azure://<container>/<path-to-file>`|None|`uris` or `prefixes` or
`objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to
ingest, in the form `azure://<container>/<prefix>`. Empty objects starting with
one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects`
must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or
`objects` must be set|
-|objectGlob|A glob for the object part of the S3 URI. In the URI
`s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br
/>The glob must match the entire object part, not just the filename. For
example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the
object part is `bar/file.json`, and the`*` does not match the slash. To match
all objects ending in `.json`, use `**.json` instead.<br /><br />For more
information, refer to the documentation for [`F [...]
+|objectGlob|A glob for the object part of the Azure URI. In the URI
`azure://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br
/>The glob must match the entire object part, not just the filename. For
example, the glob `*.json` does not match `azure://foo/bar/file.json`, because
the object part is `bar/file.json`, and the`*` does not match the slash. To
match all objects ending in `.json`, use `**.json` instead.<br /><br />For more
information, refer to the documentatio [...]
|systemFields|JSON array of system fields to return as part of input rows.
Possible values: `__file_uri` (Azure blob URI starting with `azure://`),
`__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|
Note that the Azure input source skips all empty objects only when `prefixes`
is specified.
@@ -384,6 +490,8 @@ The `objects` property is:
|bucket|Name of the Azure Blob Storage or Azure Data Lake container|None|yes|
|path|The path where data is located.|None|yes|
+</details>
+
## HDFS input source
:::info
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
index fc04b7b710f..31439c91fbb 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java
@@ -24,8 +24,10 @@ import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.storage.azure.AzureByteSource;
import org.apache.druid.storage.azure.AzureByteSourceFactory;
+import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureUtils;
import javax.annotation.Nonnull;
@@ -40,21 +42,32 @@ public class AzureEntity extends RetryingInputEntity
{
private final CloudObjectLocation location;
private final AzureByteSource byteSource;
+ private final String scheme;
@AssistedInject
AzureEntity(
@Nonnull @Assisted CloudObjectLocation location,
+ @Nonnull @Assisted AzureStorage azureStorage,
+ @Nonnull @Assisted String scheme,
@Nonnull AzureByteSourceFactory byteSourceFactory
+
)
{
this.location = location;
- this.byteSource = byteSourceFactory.create(location.getBucket(),
location.getPath());
+ this.scheme = scheme;
+ // If scheme is azureStorage, containerName is the first prefix in the
path, otherwise containerName is the bucket
+ if (AzureStorageAccountInputSource.SCHEME.equals(this.scheme)) {
+ Pair<String, String> locationInfo =
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(location);
+ this.byteSource = byteSourceFactory.create(locationInfo.lhs,
locationInfo.rhs, azureStorage);
+ } else {
+ this.byteSource = byteSourceFactory.create(location.getBucket(),
location.getPath(), azureStorage);
+ }
}
@Override
public URI getUri()
{
- return location.toUri(AzureInputSource.SCHEME);
+ return location.toUri(this.scheme);
}
@Override
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
index 86f64a1f986..b75fab78078 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntityFactory.java
@@ -20,11 +20,16 @@
package org.apache.druid.data.input.azure;
import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.storage.azure.AzureStorage;
/**
* Factory for creating {@link AzureEntity} objects
*/
public interface AzureEntityFactory
{
- AzureEntity create(CloudObjectLocation location);
+ AzureEntity create(
+ CloudObjectLocation location,
+ AzureStorage azureStorage,
+ String scheme
+ );
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index 8cb6ff14976..339ec18ec3a 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -35,6 +35,7 @@ import
org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
@@ -63,7 +64,7 @@ public class AzureInputSource extends CloudObjectInputSource
@JsonCreator
public AzureInputSource(
- @JacksonInject AzureStorage storage,
+ @JacksonInject @Global AzureStorage storage,
@JacksonInject AzureEntityFactory entityFactory,
@JacksonInject AzureCloudBlobIterableFactory
azureCloudBlobIterableFactory,
@JacksonInject AzureInputDataConfig inputDataConfig,
@@ -128,7 +129,7 @@ public class AzureInputSource extends CloudObjectInputSource
@Override
protected AzureEntity createEntity(CloudObjectLocation location)
{
- return entityFactory.create(location);
+ return entityFactory.create(location, storage, SCHEME);
}
@Override
@@ -140,7 +141,7 @@ public class AzureInputSource extends CloudObjectInputSource
public Iterator<LocationWithSize>
getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
- azureCloudBlobIterableFactory.create(getPrefixes(),
inputDataConfig.getMaxListingLength()).iterator(),
+ azureCloudBlobIterableFactory.create(getPrefixes(),
inputDataConfig.getMaxListingLength(), storage).iterator(),
blob -> {
try {
return new LocationWithSize(
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
similarity index 59%
copy from
extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
copy to
extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
index 8cb6ff14976..f4f59325306 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java
@@ -24,6 +24,7 @@ import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
@@ -35,7 +36,10 @@ import
org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.storage.azure.AzureAccountConfig;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
+import org.apache.druid.storage.azure.AzureIngestClientFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
@@ -52,36 +56,45 @@ import java.util.Set;
* Abstracts the Azure storage system where input data is stored. Allows users
to retrieve entities in
* the storage system that match either a particular uri, prefix, or object.
*/
-public class AzureInputSource extends CloudObjectInputSource
+public class AzureStorageAccountInputSource extends CloudObjectInputSource
{
- public static final String SCHEME = "azure";
+ public static final String SCHEME = "azureStorage";
- private final AzureStorage storage;
private final AzureEntityFactory entityFactory;
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private final AzureInputDataConfig inputDataConfig;
+ private final AzureStorageAccountInputSourceConfig
azureStorageAccountInputSourceConfig;
+ private final AzureAccountConfig azureAccountConfig;
+
+ private final AzureIngestClientFactory azureIngestClientFactory;
@JsonCreator
- public AzureInputSource(
- @JacksonInject AzureStorage storage,
+ public AzureStorageAccountInputSource(
@JacksonInject AzureEntityFactory entityFactory,
@JacksonInject AzureCloudBlobIterableFactory
azureCloudBlobIterableFactory,
@JacksonInject AzureInputDataConfig inputDataConfig,
+ @JacksonInject AzureAccountConfig azureAccountConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("objectGlob") @Nullable String objectGlob,
+ @JsonProperty("properties") @Nullable
AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig,
@JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields
)
{
super(SCHEME, uris, prefixes, objects, objectGlob, systemFields);
- this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
this.entityFactory = Preconditions.checkNotNull(entityFactory,
"AzureEntityFactory");
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
azureCloudBlobIterableFactory,
"AzureCloudBlobIterableFactory"
);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig,
"AzureInputDataConfig");
+ this.azureStorageAccountInputSourceConfig =
azureStorageAccountInputSourceConfig;
+ this.azureAccountConfig = azureAccountConfig;
+ this.azureIngestClientFactory = new AzureIngestClientFactory(
+ azureAccountConfig,
+ azureStorageAccountInputSourceConfig
+ );
}
@JsonIgnore
@@ -95,15 +108,16 @@ public class AzureInputSource extends
CloudObjectInputSource
@Override
public SplittableInputSource<List<CloudObjectLocation>>
withSplit(InputSplit<List<CloudObjectLocation>> split)
{
- return new AzureInputSource(
- storage,
+ return new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
null,
null,
split.get(),
getObjectGlob(),
+ azureStorageAccountInputSourceConfig,
systemFields
);
}
@@ -111,15 +125,15 @@ public class AzureInputSource extends
CloudObjectInputSource
@Override
public Object getSystemFieldValue(InputEntity entity, SystemField field)
{
- final AzureEntity googleEntity = (AzureEntity) entity;
+ final AzureEntity azureEntity = (AzureEntity) entity;
switch (field) {
case URI:
- return googleEntity.getUri().toString();
+ return azureEntity.getUri().toString();
case BUCKET:
- return googleEntity.getLocation().getBucket();
+ return azureEntity.getLocation().getBucket();
case PATH:
- return googleEntity.getLocation().getPath();
+ return azureEntity.getLocation().getPath();
default:
return null;
}
@@ -128,7 +142,11 @@ public class AzureInputSource extends
CloudObjectInputSource
@Override
protected AzureEntity createEntity(CloudObjectLocation location)
{
- return entityFactory.create(location);
+ return entityFactory.create(
+ location,
+ new AzureStorage(azureIngestClientFactory, location.getBucket()),
+ SCHEME
+ );
}
@Override
@@ -140,12 +158,12 @@ public class AzureInputSource extends
CloudObjectInputSource
public Iterator<LocationWithSize>
getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
- azureCloudBlobIterableFactory.create(getPrefixes(),
inputDataConfig.getMaxListingLength()).iterator(),
+ azureCloudBlobIterableFactory.create(prefixes,
inputDataConfig.getMaxListingLength(), new
AzureStorage(azureIngestClientFactory, null)).iterator(),
blob -> {
try {
return new LocationWithSize(
- blob.getContainerName(),
- blob.getName(),
+ blob.getStorageAccount(),
+ blob.getContainerName() + "/" + blob.getName(),
blob.getBlobLength()
);
}
@@ -160,9 +178,11 @@ public class AzureInputSource extends
CloudObjectInputSource
public long getObjectSize(CloudObjectLocation location)
{
try {
- final BlockBlobClient blobWithAttributes =
storage.getBlockBlobReferenceWithAttributes(
- location.getBucket(),
- location.getPath()
+ AzureStorage azureStorage = new
AzureStorage(azureIngestClientFactory, location.getBucket());
+ Pair<String, String> locationInfo =
getContainerAndPathFromObjectLocation(location);
+ final BlockBlobClient blobWithAttributes =
azureStorage.getBlockBlobReferenceWithAttributes(
+ locationInfo.lhs,
+ locationInfo.rhs
);
return blobWithAttributes.getProperties().getBlobSize();
@@ -176,6 +196,14 @@ public class AzureInputSource extends
CloudObjectInputSource
return new SplitWidget();
}
+ @Nullable
+ @JsonProperty("properties")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public AzureStorageAccountInputSourceConfig
getAzureStorageAccountInputSourceConfig()
+ {
+ return azureStorageAccountInputSourceConfig;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -188,28 +216,37 @@ public class AzureInputSource extends
CloudObjectInputSource
if (!super.equals(o)) {
return false;
}
- AzureInputSource that = (AzureInputSource) o;
- return storage.equals(that.storage) &&
- entityFactory.equals(that.entityFactory) &&
-
azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) &&
- inputDataConfig.equals(that.inputDataConfig);
+ AzureStorageAccountInputSource that = (AzureStorageAccountInputSource) o;
+ return inputDataConfig.equals(that.inputDataConfig) &&
+
azureStorageAccountInputSourceConfig.equals(that.azureStorageAccountInputSourceConfig)
&&
+ azureAccountConfig.equals(that.azureAccountConfig) &&
+ azureIngestClientFactory.equals(that.azureIngestClientFactory);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), storage, entityFactory,
azureCloudBlobIterableFactory, inputDataConfig);
+ return Objects.hash(super.hashCode(), inputDataConfig,
azureStorageAccountInputSourceConfig, azureAccountConfig,
azureIngestClientFactory);
}
@Override
public String toString()
{
- return "AzureInputSource{" +
- "uris=" + getUris() +
- ", prefixes=" + getPrefixes() +
- ", objects=" + getObjects() +
- ", objectGlob=" + getObjectGlob() +
- (systemFields.getFields().isEmpty() ? "" : ", systemFields=" +
systemFields) +
- '}';
+ return "AzureStorageAccountInputSource{" +
+ "uris=" + getUris() +
+ ", prefixes=" + getPrefixes() +
+ ", objects=" + getObjects() +
+ ", objectGlob=" + getObjectGlob() +
+ ", azureStorageAccountInputSourceConfig=" +
getAzureStorageAccountInputSourceConfig() +
+ (systemFields.getFields().isEmpty() ? "" : ", systemFields=" +
systemFields) +
+ '}';
+ }
+
+ // Returns a <containerName, path> pair given a location using the
azureStorage schema.
+ public static Pair<String, String>
getContainerAndPathFromObjectLocation(CloudObjectLocation location)
+ {
+ String[] pathParts = location.getPath().split("/", 2);
+ // If there is no path specified, use a empty path as azure will throw a
exception that is more clear than a index error.
+ return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : "");
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
new file mode 100644
index 00000000000..4a2d8d0cb97
--- /dev/null
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceConfig.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.azure;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Contains properties for Azure input source.
+ * Properties can be specified by ingestionSpec which will override system
default.
+ */
+public class AzureStorageAccountInputSourceConfig
+{
+ private final String sharedAccessStorageToken;
+ private final String key;
+ private final String appRegistrationClientId;
+ private final String appRegistrationClientSecret;
+ private final String tenantId;
+
+ @JsonCreator
+ public AzureStorageAccountInputSourceConfig(
+ @JsonProperty("sharedAccessStorageToken") @Nullable String
sharedAccessStorageToken,
+ @JsonProperty("key") @Nullable String key,
+ @JsonProperty("appRegistrationClientId") @Nullable String
appRegistrationClientId,
+ @JsonProperty("appRegistrationClientSecret") @Nullable String
appRegistrationClientSecret,
+ @JsonProperty("tenantId") @Nullable String tenantId
+
+ )
+ {
+ this.sharedAccessStorageToken = sharedAccessStorageToken;
+ this.key = key;
+ this.appRegistrationClientId = appRegistrationClientId;
+ this.appRegistrationClientSecret = appRegistrationClientSecret;
+ this.tenantId = tenantId;
+ }
+
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getSharedAccessStorageToken()
+ {
+ return sharedAccessStorageToken;
+ }
+
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getAppRegistrationClientId()
+ {
+ return appRegistrationClientId;
+ }
+
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getAppRegistrationClientSecret()
+ {
+ return appRegistrationClientSecret;
+ }
+
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getTenantId()
+ {
+ return tenantId;
+ }
+
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getKey()
+ {
+ return key;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AzureInputSourceConfig{" +
+ "sharedAccessStorageToken=" + sharedAccessStorageToken +
+ ", key=" + key +
+ ", appRegistrationClientId=" + appRegistrationClientId +
+ ", appRegistrationClientSecret=" + appRegistrationClientSecret +
+ ", tenantId=" + tenantId +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AzureStorageAccountInputSourceConfig that =
(AzureStorageAccountInputSourceConfig) o;
+ return Objects.equals(key, that.key)
+ && Objects.equals(sharedAccessStorageToken,
that.sharedAccessStorageToken)
+ && Objects.equals(appRegistrationClientId,
that.appRegistrationClientId)
+ && Objects.equals(appRegistrationClientSecret,
that.appRegistrationClientSecret)
+ && Objects.equals(tenantId, that.tenantId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ sharedAccessStorageToken,
+ key,
+ appRegistrationClientId,
+ appRegistrationClientSecret,
+ tenantId
+ );
+ }
+}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
index dfbf6cb3385..537c0b60d37 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
@@ -40,7 +40,7 @@ public class AzureByteSource extends ByteSource
@AssistedInject
public AzureByteSource(
- AzureStorage azureStorage,
+ @Assisted("azureStorage") AzureStorage azureStorage,
@Assisted("containerName") String containerName,
@Assisted("blobPath") String blobPath
)
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
index 249e096b4c6..be3e5297e54 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSourceFactory.java
@@ -26,5 +26,9 @@ import com.google.inject.assistedinject.Assisted;
*/
public interface AzureByteSourceFactory
{
- AzureByteSource create(@Assisted("containerName") String containerName,
@Assisted("blobPath") String blobPath);
+ AzureByteSource create(
+ @Assisted("containerName") String containerName,
+ @Assisted("blobPath") String blobPath,
+ @Assisted("azureStorage") AzureStorage azureStorage
+ );
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
index d72fcd8395f..3c8d8e27de2 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java
@@ -28,7 +28,9 @@ import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.druid.java.util.common.Pair;
+import javax.annotation.Nullable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -39,8 +41,8 @@ import java.util.Map;
public class AzureClientFactory
{
- private final AzureAccountConfig config;
- private final Map<Integer, BlobServiceClient> cachedBlobServiceClients;
+ protected final AzureAccountConfig config;
+ private final Map<Pair<String, Integer>, BlobServiceClient>
cachedBlobServiceClients;
public AzureClientFactory(AzureAccountConfig config)
{
@@ -49,38 +51,26 @@ public class AzureClientFactory
}
// It's okay to store clients in a map here because all the configs for
specifying azure retries are static, and there are only 2 of them.
- // The 2 configs are AzureAccountConfig.maxTries and
AzureOutputConfig.maxRetrr.
- // We will only ever have at most 2 clients in cachedBlobServiceClients.
- public BlobServiceClient getBlobServiceClient(Integer retryCount)
+ // The 2 configs are AzureAccountConfig.maxTries and
AzureOutputConfig.maxRetry.
+ // We will only ever have at most 2 clients in cachedBlobServiceClients per
storage account.
+ public BlobServiceClient getBlobServiceClient(@Nullable Integer retryCount,
String storageAccount)
{
- if (!cachedBlobServiceClients.containsKey(retryCount)) {
- BlobServiceClientBuilder clientBuilder =
getAuthenticatedBlobServiceClientBuilder()
- .retryOptions(new RetryOptions(
- new ExponentialBackoffOptions()
- .setMaxRetries(retryCount != null ? retryCount :
config.getMaxTries())
- .setBaseDelay(Duration.ofMillis(1000))
- .setMaxDelay(Duration.ofMillis(60000))
- ));
- cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient());
- }
-
- return cachedBlobServiceClients.get(retryCount);
+ return cachedBlobServiceClients.computeIfAbsent(Pair.of(storageAccount,
retryCount != null ? retryCount : config.getMaxTries()), key ->
buildNewClient(key.rhs, key.lhs));
}
-
// Mainly here to make testing easier.
public BlobBatchClient getBlobBatchClient(BlobContainerClient
blobContainerClient)
{
return new BlobBatchClientBuilder(blobContainerClient).buildClient();
}
- private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
+ protected BlobServiceClient buildNewClient(Integer retryCount, String
storageAccount)
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
- .endpoint("https://" + config.getAccount() + "." +
config.getBlobStorageEndpoint());
+ .endpoint("https://" + storageAccount + "." +
config.getBlobStorageEndpoint());
if (config.getKey() != null) {
- clientBuilder.credential(new
StorageSharedKeyCredential(config.getAccount(), config.getKey()));
+ clientBuilder.credential(new StorageSharedKeyCredential(storageAccount,
config.getKey()));
} else if (config.getSharedAccessStorageToken() != null) {
clientBuilder.sasToken(config.getSharedAccessStorageToken());
} else if (config.getUseAzureCredentialsChain()) {
@@ -89,6 +79,13 @@ public class AzureClientFactory
.managedIdentityClientId(config.getManagedIdentityClientId());
clientBuilder.credential(defaultAzureCredentialBuilder.build());
}
- return clientBuilder;
+ return clientBuilder
+ .retryOptions(new RetryOptions(
+ new ExponentialBackoffOptions()
+ .setMaxRetries(retryCount)
+ .setBaseDelay(Duration.ofMillis(1000))
+ .setMaxDelay(Duration.ofMillis(60000))
+ ))
+ .buildClient();
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
index 55c51e58bb8..1900a46c4d4 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterable.java
@@ -34,22 +34,25 @@ public class AzureCloudBlobIterable implements
Iterable<CloudBlobHolder>
private final Iterable<URI> prefixes;
private final int maxListingLength;
private final AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory;
+ private final AzureStorage azureStorage;
@AssistedInject
public AzureCloudBlobIterable(
AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory,
@Assisted final Iterable<URI> prefixes,
- @Assisted final int maxListingLength
+ @Assisted final int maxListingLength,
+ @Assisted final AzureStorage azureStorage
)
{
this.azureCloudBlobIteratorFactory = azureCloudBlobIteratorFactory;
this.prefixes = prefixes;
this.maxListingLength = maxListingLength;
+ this.azureStorage = azureStorage;
}
@Override
public Iterator<CloudBlobHolder> iterator()
{
- return azureCloudBlobIteratorFactory.create(prefixes, maxListingLength);
+ return azureCloudBlobIteratorFactory.create(prefixes, maxListingLength,
azureStorage);
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
index ee038e2c06a..43eac7c76bf 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterableFactory.java
@@ -26,5 +26,5 @@ import java.net.URI;
*/
public interface AzureCloudBlobIterableFactory
{
- AzureCloudBlobIterable create(Iterable<URI> prefixes, int maxListingLength);
+ AzureCloudBlobIterable create(Iterable<URI> prefixes, int maxListingLength,
AzureStorage azureStorage);
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
index 95ca3c08ae8..d57238ccb3a 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
@@ -22,6 +22,9 @@ package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobItem;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
@@ -37,9 +40,10 @@ import java.util.NoSuchElementException;
public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
{
private static final Logger log = new Logger(AzureCloudBlobIterator.class);
- private final AzureStorage storage;
private final Iterator<URI> prefixesIterator;
private final int maxListingLength;
+ private AzureStorage storage;
+ private String currentStorageAccount;
private String currentContainer;
private String currentPrefix;
private CloudBlobHolder currentBlobItem;
@@ -48,16 +52,17 @@ public class AzureCloudBlobIterator implements
Iterator<CloudBlobHolder>
@AssistedInject
AzureCloudBlobIterator(
- AzureStorage storage,
+ @Assisted AzureStorage azureStorage,
AzureAccountConfig config,
@Assisted final Iterable<URI> prefixes,
@Assisted final int maxListingLength
)
{
- this.storage = storage;
+ this.storage = azureStorage;
this.config = config;
this.prefixesIterator = prefixes.iterator();
this.maxListingLength = maxListingLength;
+ this.currentStorageAccount = null;
this.currentContainer = null;
this.currentPrefix = null;
this.currentBlobItem = null;
@@ -91,8 +96,18 @@ public class AzureCloudBlobIterator implements
Iterator<CloudBlobHolder>
private void prepareNextRequest()
{
URI currentUri = prefixesIterator.next();
- currentContainer = currentUri.getAuthority();
- currentPrefix = AzureUtils.extractAzureKey(currentUri);
+
+ if (currentUri.getScheme().equals(AzureStorageAccountInputSource.SCHEME)) {
+ CloudObjectLocation cloudObjectLocation = new
CloudObjectLocation(currentUri);
+ Pair<String, String> containerInfo =
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(cloudObjectLocation);
+ currentStorageAccount = cloudObjectLocation.getBucket();
+ currentContainer = containerInfo.lhs;
+ currentPrefix = containerInfo.rhs;
+ } else {
+ currentStorageAccount = config.getAccount();
+ currentContainer = currentUri.getAuthority();
+ currentPrefix = AzureUtils.extractAzureKey(currentUri);
+ }
log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
currentUri, currentContainer, currentPrefix
);
@@ -109,6 +124,7 @@ public class AzureCloudBlobIterator implements
Iterator<CloudBlobHolder>
);
// We don't need to iterate by page because the client handles this, it
will fetch the next page when necessary.
blobItemIterator = storage.listBlobsWithPrefixInContainerSegmented(
+ currentStorageAccount,
currentContainer,
currentPrefix,
maxListingLength,
@@ -135,7 +151,7 @@ public class AzureCloudBlobIterator implements
Iterator<CloudBlobHolder>
while (blobItemIterator.hasNext()) {
BlobItem blobItem = blobItemIterator.next();
if (!blobItem.isPrefix() &&
blobItem.getProperties().getContentLength() > 0) {
- currentBlobItem = new CloudBlobHolder(blobItem, currentContainer);
+ currentBlobItem = new CloudBlobHolder(blobItem, currentContainer,
currentStorageAccount);
return;
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
index 0197c96e247..da17bcb6ff9 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorFactory.java
@@ -26,5 +26,5 @@ import java.net.URI;
*/
public interface AzureCloudBlobIteratorFactory
{
- AzureCloudBlobIterator create(Iterable<URI> prefixes, int maxListingLength);
+ AzureCloudBlobIterator create(Iterable<URI> prefixes, int maxListingLength,
AzureStorage azureStorage);
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
index 0b8ccf6daee..5429b845ef6 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java
@@ -22,6 +22,7 @@ package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -51,7 +52,7 @@ public class AzureDataSegmentKiller implements
DataSegmentKiller
AzureDataSegmentConfig segmentConfig,
AzureInputDataConfig inputDataConfig,
AzureAccountConfig accountConfig,
- final AzureStorage azureStorage,
+ @Global final AzureStorage azureStorage,
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory
)
{
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
index c20413b1169..44094daade6 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java
@@ -21,6 +21,7 @@ package org.apache.druid.storage.azure;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -37,16 +38,19 @@ public class AzureDataSegmentPuller
private static final Logger log = new Logger(AzureDataSegmentPuller.class);
private final AzureByteSourceFactory byteSourceFactory;
+ private final AzureStorage azureStorage;
private final AzureAccountConfig azureAccountConfig;
@Inject
public AzureDataSegmentPuller(
AzureByteSourceFactory byteSourceFactory,
+ @Global AzureStorage azureStorage,
AzureAccountConfig azureAccountConfig
)
{
this.byteSourceFactory = byteSourceFactory;
+ this.azureStorage = azureStorage;
this.azureAccountConfig = azureAccountConfig;
}
@@ -66,7 +70,7 @@ public class AzureDataSegmentPuller
final String actualBlobPath =
AzureUtils.maybeRemoveAzurePathPrefix(blobPath,
azureAccountConfig.getBlobStorageEndpoint());
- final ByteSource byteSource = byteSourceFactory.create(containerName,
actualBlobPath);
+ final ByteSource byteSource = byteSourceFactory.create(containerName,
actualBlobPath, azureStorage);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
outDir,
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 8180b362b66..1b48d10c315 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
@@ -51,7 +52,7 @@ public class AzureDataSegmentPusher implements
DataSegmentPusher
@Inject
public AzureDataSegmentPusher(
- AzureStorage azureStorage,
+ @Global AzureStorage azureStorage,
AzureAccountConfig accountConfig,
AzureDataSegmentConfig segmentConfig
)
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
new file mode 100644
index 00000000000..aaf27127939
--- /dev/null
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureIngestClientFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.azure.core.http.policy.ExponentialBackoffOptions;
+import com.azure.core.http.policy.RetryOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSourceConfig;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+
+
+public class AzureIngestClientFactory extends AzureClientFactory
+{
+ private final AzureStorageAccountInputSourceConfig
azureStorageAccountInputSourceConfig;
+
+ public AzureIngestClientFactory(AzureAccountConfig config, @Nullable
AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig)
+ {
+ super(config);
+ this.azureStorageAccountInputSourceConfig =
azureStorageAccountInputSourceConfig;
+ }
+
+ @Override
+ public BlobServiceClient buildNewClient(Integer retryCount, String
storageAccount)
+ {
+ BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
+ .endpoint("https://" + storageAccount + "." +
config.getBlobStorageEndpoint());
+
+ if (azureStorageAccountInputSourceConfig == null) {
+ // If properties is not passed in inputSpec, use default azure
credentials.
+ return super.buildNewClient(retryCount, storageAccount);
+ }
+
+ if (azureStorageAccountInputSourceConfig.getKey() != null) {
+ clientBuilder.credential(new StorageSharedKeyCredential(storageAccount,
azureStorageAccountInputSourceConfig.getKey()));
+ } else if
(azureStorageAccountInputSourceConfig.getSharedAccessStorageToken() != null) {
+
clientBuilder.sasToken(azureStorageAccountInputSourceConfig.getSharedAccessStorageToken());
+ } else if
(azureStorageAccountInputSourceConfig.getAppRegistrationClientId() != null &&
azureStorageAccountInputSourceConfig.getAppRegistrationClientSecret() != null) {
+ clientBuilder.credential(new ClientSecretCredentialBuilder()
+
.clientSecret(azureStorageAccountInputSourceConfig.getAppRegistrationClientSecret())
+
.clientId(azureStorageAccountInputSourceConfig.getAppRegistrationClientId())
+ .tenantId(azureStorageAccountInputSourceConfig.getTenantId())
+ .build()
+ );
+ } else {
+ // No credentials set in properties, use default azurecredentials.
+ return super.buildNewClient(retryCount, storageAccount);
+ }
+ clientBuilder.retryOptions(new RetryOptions(
+ new ExponentialBackoffOptions()
+ .setMaxRetries(retryCount != null ? retryCount :
config.getMaxTries())
+ .setBaseDelay(Duration.ofMillis(1000))
+ .setMaxDelay(Duration.ofMillis(60000))
+ ));
+ return clientBuilder.buildClient();
+ }
+}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index b929d255c1c..9cf5952bd81 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -61,12 +61,15 @@ public class AzureStorage
private static final Logger log = new Logger(AzureStorage.class);
private final AzureClientFactory azureClientFactory;
+ private final String defaultStorageAccount;
public AzureStorage(
- AzureClientFactory azureClientFactory
+ AzureClientFactory azureClientFactory,
+ @Nullable String defaultStorageAccount
)
{
this.azureClientFactory = azureClientFactory;
+ this.defaultStorageAccount = defaultStorageAccount;
}
public List<String> emptyCloudBlobDirectory(final String containerName,
final String virtualDirPath)
@@ -216,18 +219,27 @@ public class AzureStorage
@VisibleForTesting
BlobServiceClient getBlobServiceClient(Integer maxAttempts)
{
- return azureClientFactory.getBlobServiceClient(maxAttempts);
+ return azureClientFactory.getBlobServiceClient(maxAttempts,
defaultStorageAccount);
}
+ @VisibleForTesting
+ BlobServiceClient getBlobServiceClient(String storageAccount, Integer
maxAttempts)
+ {
+ return azureClientFactory.getBlobServiceClient(maxAttempts,
storageAccount);
+ }
+
+ // This method is used in AzureCloudBlobIterator in a method where one
azureStorage instance might need to list from multiple
+ // storage accounts, so storageAccount is a valid parameter.
@VisibleForTesting
PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
+ final String storageAccount,
final String containerName,
final String prefix,
int maxResults,
Integer maxAttempts
) throws BlobStorageException
{
- BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(containerName, maxAttempts);
+ BlobContainerClient blobContainerClient =
getOrCreateBlobContainerClient(storageAccount, containerName, maxAttempts);
return blobContainerClient.listBlobs(
new
ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults),
Duration.ofMillis(DELTA_BACKOFF_MS)
@@ -243,4 +255,9 @@ public class AzureStorage
{
return
getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName);
}
+
+ private BlobContainerClient getOrCreateBlobContainerClient(final String
storageAccount, final String containerName, final Integer maxRetries)
+ {
+ return getBlobServiceClient(storageAccount,
maxRetries).createBlobContainerIfNotExists(containerName);
+ }
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index 25db821601b..19aa5c7fbbe 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -31,9 +31,11 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.data.input.azure.AzureEntityFactory;
import org.apache.druid.data.input.azure.AzureInputSource;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
@@ -46,10 +48,6 @@ public class AzureStorageDruidModule implements DruidModule
{
public static final String SCHEME = "azure";
- public static final String
- STORAGE_CONNECTION_STRING_WITH_KEY =
"DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;EndpointSuffix=%s;";
- public static final String
- STORAGE_CONNECTION_STRING_WITH_TOKEN =
"DefaultEndpointsProtocol=%s;AccountName=%s;SharedAccessSignature=%s;EndpointSuffix=%s;";
public static final String INDEX_ZIP_FILE_NAME = "index.zip";
@Override
@@ -77,7 +75,8 @@ public class AzureStorageDruidModule implements DruidModule
}
},
new SimpleModule().registerSubtypes(
- new NamedType(AzureInputSource.class, SCHEME)
+ new NamedType(AzureInputSource.class, SCHEME),
+ new NamedType(AzureStorageAccountInputSource.class,
AzureStorageAccountInputSource.SCHEME)
)
);
}
@@ -135,11 +134,13 @@ public class AzureStorageDruidModule implements
DruidModule
}
@Provides
+ @Global
@LazySingleton
public AzureStorage getAzureStorageContainer(
- final AzureClientFactory azureClientFactory
+ final AzureClientFactory azureClientFactory,
+ final AzureAccountConfig azureAccountConfig
)
{
- return new AzureStorage(azureClientFactory);
+ return new AzureStorage(azureClientFactory,
azureAccountConfig.getAccount());
}
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 74e928edc67..ed881d52fcc 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -23,6 +23,7 @@ import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -53,7 +54,7 @@ public class AzureTaskLogs implements TaskLogs
AzureTaskLogsConfig config,
AzureInputDataConfig inputDataConfig,
AzureAccountConfig accountConfig,
- AzureStorage azureStorage,
+ @Global AzureStorage azureStorage,
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
CurrentTimeMillisSupplier timeSupplier)
{
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
index 0c00a633d77..e214ab909ef 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java
@@ -126,7 +126,7 @@ public class AzureUtils
azureCloudBlobIterableFactory.create(ImmutableList.of(new
CloudObjectLocation(
bucket,
prefix
- ).toUri("azure")), config.getMaxListingLength());
+ ).toUri("azure")), config.getMaxListingLength(), storage);
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterable.iterator();
while (iterator.hasNext()) {
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
index d2398aebda1..59ed10ed770 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/blob/CloudBlobHolder.java
@@ -30,11 +30,13 @@ public class CloudBlobHolder
{
private final BlobItem delegate;
private final String container;
+ private final String storageAccount;
- public CloudBlobHolder(BlobItem delegate, String container)
+ public CloudBlobHolder(BlobItem delegate, String container, String
storageAccount)
{
this.delegate = delegate;
this.container = container;
+ this.storageAccount = storageAccount;
}
public String getContainerName()
@@ -42,6 +44,11 @@ public class CloudBlobHolder
return container;
}
+ public String getStorageAccount()
+ {
+ return storageAccount;
+ }
+
public String getName()
{
return delegate.getName();
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
index 4264801f4ac..79be724c17f 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
@@ -36,6 +37,7 @@ import java.io.File;
public class AzureStorageConnectorProvider extends AzureOutputConfig
implements StorageConnectorProvider
{
+ @Global
@JacksonInject
AzureStorage azureStorage;
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
index 0ff858336bb..8453b71b4b7 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureEntityTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.input.NullInputStream;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.storage.azure.AzureByteSource;
import org.apache.druid.storage.azure.AzureByteSourceFactory;
+import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -37,6 +38,7 @@ import java.net.URI;
public class AzureEntityTest extends EasyMockSupport
{
+ private static final String STORAGE_ACCOUNT_NAME = "storageAccount";
private static final String CONTAINER_NAME = "container";
private static final String BLOB_NAME = "blob";
private static final int OFFSET = 20;
@@ -49,6 +51,7 @@ public class AzureEntityTest extends EasyMockSupport
private AzureByteSource byteSource;
private AzureEntity azureEntity;
+ private AzureStorage azureStorage;
static {
try {
@@ -65,6 +68,7 @@ public class AzureEntityTest extends EasyMockSupport
location = createMock(CloudObjectLocation.class);
byteSourceFactory = createMock(AzureByteSourceFactory.class);
byteSource = createMock(AzureByteSource.class);
+ azureStorage = createMock(AzureStorage.class);
}
@Test
@@ -72,11 +76,11 @@ public class AzureEntityTest extends EasyMockSupport
{
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
EasyMock.expect(location.toUri(AzureInputSource.SCHEME)).andReturn(ENTITY_URI);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
URI actualUri = azureEntity.getUri();
Assert.assertEquals(ENTITY_URI, actualUri);
@@ -85,16 +89,38 @@ public class AzureEntityTest extends EasyMockSupport
}
+ @Test
+ public void test_getUri_returnsLocationUri_azureStorageScheme()
+ {
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
+ replayAll();
+
+ azureEntity = new AzureEntity(
+ new CloudObjectLocation(STORAGE_ACCOUNT_NAME, CONTAINER_NAME + "/" +
BLOB_NAME),
+ azureStorage,
+ AzureStorageAccountInputSource.SCHEME,
+ byteSourceFactory
+ );
+
+ Assert.assertEquals(
+ URI.create(AzureStorageAccountInputSource.SCHEME + "://" +
STORAGE_ACCOUNT_NAME + "/" + CONTAINER_NAME + "/" + BLOB_NAME),
+ azureEntity.getUri()
+ );
+
+ verifyAll();
+
+ }
+
@Test
public void test_readFromStart_returnsExpectedStream() throws Exception
{
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
EasyMock.expect(byteSource.openStream(0)).andReturn(INPUT_STREAM);
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
InputStream actualInputStream = azureEntity.readFrom(0);
Assert.assertSame(INPUT_STREAM, actualInputStream);
@@ -106,10 +132,10 @@ public class AzureEntityTest extends EasyMockSupport
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
EasyMock.expect(byteSource.openStream(OFFSET)).andReturn(INPUT_STREAM);
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
InputStream actualInputStream = azureEntity.readFrom(OFFSET);
Assert.assertSame(INPUT_STREAM, actualInputStream);
@@ -122,10 +148,10 @@ public class AzureEntityTest extends EasyMockSupport
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
EasyMock.expect(byteSource.openStream(OFFSET)).andThrow(IO_EXCEPTION);
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
azureEntity.readFrom(OFFSET);
}
catch (IOException e) {
@@ -138,25 +164,45 @@ public class AzureEntityTest extends EasyMockSupport
{
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
String actualPath = azureEntity.getPath();
Assert.assertEquals(BLOB_NAME, actualPath);
verifyAll();
}
+ @Test
+ public void test_getPath_azureStorageScheme()
+ {
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
+ replayAll();
+
+ azureEntity = new AzureEntity(
+ new CloudObjectLocation(STORAGE_ACCOUNT_NAME, CONTAINER_NAME + "/" +
BLOB_NAME),
+ azureStorage,
+ AzureStorageAccountInputSource.SCHEME,
+ byteSourceFactory
+ );
+
+ Assert.assertEquals(
+ CONTAINER_NAME + "/" + BLOB_NAME,
+ azureEntity.getPath()
+ );
+
+ verifyAll();
+ }
@Test
public void test_getRetryCondition_returnsExpectedRetryCondition()
{
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_NAME)).andReturn(byteSource);
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME,
azureStorage)).andReturn(byteSource);
replayAll();
- azureEntity = new AzureEntity(location, byteSourceFactory);
+ azureEntity = new AzureEntity(location, azureStorage,
AzureInputSource.SCHEME, byteSourceFactory);
Predicate<Throwable> actualRetryCondition =
azureEntity.getRetryCondition();
Assert.assertSame(AzureUtils.AZURE_RETRY, actualRetryCondition);
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
index 709e8c191c1..e11ce8ba548 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.storage.azure.AzureAccountConfig;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureDataSegmentConfig;
import org.apache.druid.storage.azure.AzureInputDataConfig;
@@ -74,6 +75,8 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureInputDataConfig inputDataConfig;
+ private AzureAccountConfig accountConfig;
+
static {
try {
@@ -96,7 +99,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
entityFactory = createMock(AzureEntityFactory.class);
azureCloudBlobIterableFactory =
createMock(AzureCloudBlobIterableFactory.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
-
+ accountConfig = createMock(AzureAccountConfig.class);
}
@Test
@@ -177,6 +180,7 @@ public class AzureInputSourceSerdeTest extends
EasyMockSupport
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class,
azureCloudBlobIterableFactory);
injectableValues.addValue(AzureInputDataConfig.class, inputDataConfig);
+ injectableValues.addValue(AzureAccountConfig.class, accountConfig);
return injectableValues;
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index 692c8dfeeba..a7cb7c708c6 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.azure;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
@@ -130,7 +132,7 @@ public class AzureInputSourceTest extends EasyMockSupport
@Test
public void test_createEntity_returnsExpectedEntity()
{
-
EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1)).andReturn(azureEntity1);
+ EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1, storage,
AzureInputSource.SCHEME)).andReturn(azureEntity1);
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)).times(2);
replayAll();
@@ -161,7 +163,7 @@ public class AzureInputSourceTest extends EasyMockSupport
List<CloudBlobHolder> expectedCloudBlobs =
ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator =
expectedCloudBlobs.iterator();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
- EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH)).andReturn(
+ EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH, storage)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
@@ -209,7 +211,7 @@ public class AzureInputSourceTest extends EasyMockSupport
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
- EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH)).andReturn(
+ EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH, storage)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
@@ -357,7 +359,9 @@ public class AzureInputSourceTest extends EasyMockSupport
final AzureEntity entity = new AzureEntity(
new CloudObjectLocation("foo", "bar"),
- (containerName, blobPath) -> null
+ storage,
+ AzureInputSource.SCHEME,
+ (containerName, blobPath, storage) -> null
);
Assert.assertEquals("azure://foo/bar",
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
@@ -371,7 +375,8 @@ public class AzureInputSourceTest extends EasyMockSupport
EqualsVerifier.forClass(AzureInputSource.class)
.usingGetClass()
.withPrefabValues(Logger.class, new
Logger(AzureStorage.class), new Logger(AzureStorage.class))
- .withPrefabValues(AzureStorage.class, new
AzureStorage(null), new AzureStorage(null))
+ .withPrefabValues(BlobContainerClient.class, new
BlobContainerClientBuilder().buildClient(), new
BlobContainerClientBuilder().buildClient())
+ .withPrefabValues(AzureStorage.class, new AzureStorage(null,
null), new AzureStorage(null, null))
.withNonnullFields("storage")
.withNonnullFields("entityFactory")
.withNonnullFields("azureCloudBlobIterableFactory")
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
similarity index 63%
copy from
extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
copy to
extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
index 692c8dfeeba..8d17d9ba01e 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSourceTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.azure;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
@@ -31,10 +33,13 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.storage.azure.AzureAccountConfig;
import org.apache.druid.storage.azure.AzureCloudBlobIterable;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
+import org.apache.druid.storage.azure.AzureIngestClientFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
@@ -55,17 +60,18 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public class AzureInputSourceTest extends EasyMockSupport
+public class AzureStorageAccountInputSourceTest extends EasyMockSupport
{
- private static final String CONTAINER_NAME = "container";
private static final String BLOB_NAME = "blob";
private static final URI PREFIX_URI;
private final List<URI> EMPTY_URIS = ImmutableList.of();
private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
+ private static final String STORAGE_ACCOUNT = "STORAGE_ACCOUNT";
+ private static final String DEFAULT_STORAGE_ACCOUNT =
"DEFAULT_STORAGE_ACCOUNT";
private static final String CONTAINER = "CONTAINER";
private static final String BLOB_PATH = "BLOB_PATH.csv";
- private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new
CloudObjectLocation(CONTAINER, BLOB_PATH);
+ private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new
CloudObjectLocation(STORAGE_ACCOUNT, CONTAINER + "/" + BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
@@ -80,17 +86,19 @@ public class AzureInputSourceTest extends EasyMockSupport
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureInputDataConfig inputDataConfig;
+ private AzureStorageAccountInputSourceConfig
azureStorageAccountInputSourceConfig;
+ private AzureAccountConfig azureAccountConfig;
private InputSplit<List<CloudObjectLocation>> inputSplit;
private AzureEntity azureEntity1;
private CloudBlobHolder cloudBlobDruid1;
private AzureCloudBlobIterable azureCloudBlobIterable;
- private AzureInputSource azureInputSource;
+ private AzureStorageAccountInputSource azureInputSource;
static {
try {
- PREFIX_URI = new URI(AzureInputSource.SCHEME + "://" + CONTAINER_NAME +
"/" + BLOB_NAME);
+ PREFIX_URI = new URI(AzureStorageAccountInputSource.SCHEME + "://" +
STORAGE_ACCOUNT + "/" + CONTAINER + "/" + BLOB_NAME);
}
catch (Exception e) {
throw new RuntimeException(e);
@@ -108,21 +116,25 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig = createMock(AzureInputDataConfig.class);
cloudBlobDruid1 = createMock(CloudBlobHolder.class);
azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class);
+ azureStorageAccountInputSourceConfig =
createMock(AzureStorageAccountInputSourceConfig.class);
+ azureAccountConfig = createMock(AzureAccountConfig.class);
+
EasyMock.expect(azureAccountConfig.getAccount()).andReturn(DEFAULT_STORAGE_ACCOUNT).anyTimes();
}
@Test(expected = IllegalArgumentException.class)
public void
test_constructor_emptyUrisEmptyPrefixesEmptyObjects_throwsIllegalArgumentException()
{
replayAll();
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
null
);
}
@@ -130,20 +142,21 @@ public class AzureInputSourceTest extends EasyMockSupport
@Test
public void test_createEntity_returnsExpectedEntity()
{
-
EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1)).andReturn(azureEntity1);
+ EasyMock.expect(entityFactory.create(EasyMock.eq(CLOUD_OBJECT_LOCATION_1),
EasyMock.anyObject(AzureStorage.class),
EasyMock.eq(AzureStorageAccountInputSource.SCHEME))).andReturn(azureEntity1);
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1)).times(2);
replayAll();
List<CloudObjectLocation> objects =
ImmutableList.of(CLOUD_OBJECT_LOCATION_1);
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
objects,
null,
+ azureStorageAccountInputSourceConfig,
null
);
@@ -161,23 +174,25 @@ public class AzureInputSourceTest extends EasyMockSupport
List<CloudBlobHolder> expectedCloudBlobs =
ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator =
expectedCloudBlobs.iterator();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
- EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH)).andReturn(
+
EasyMock.expect(azureCloudBlobIterableFactory.create(EasyMock.eq(prefixes),
EasyMock.eq(MAX_LISTING_LENGTH),
EasyMock.anyObject(AzureStorage.class))).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
+
EasyMock.expect(cloudBlobDruid1.getStorageAccount()).andReturn(STORAGE_ACCOUNT).anyTimes();
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
replayAll();
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
null
);
@@ -187,7 +202,7 @@ public class AzureInputSourceTest extends EasyMockSupport
);
List<List<CloudObjectLocation>> actualCloudLocationList =
cloudObjectStream.map(InputSplit::get)
-
.collect(Collectors.toList());
+ .collect(Collectors.toList());
verifyAll();
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
}
@@ -209,24 +224,26 @@ public class AzureInputSourceTest extends EasyMockSupport
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
- EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH)).andReturn(
+
EasyMock.expect(azureCloudBlobIterableFactory.create(EasyMock.eq(prefixes),
EasyMock.eq(MAX_LISTING_LENGTH),
EasyMock.anyObject(AzureStorage.class))).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
+
EasyMock.expect(cloudBlobDruid1.getStorageAccount()).andReturn(STORAGE_ACCOUNT).anyTimes();
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
replayAll();
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
objectGlob,
+ azureStorageAccountInputSourceConfig,
null
);
@@ -236,7 +253,7 @@ public class AzureInputSourceTest extends EasyMockSupport
);
List<List<CloudObjectLocation>> actualCloudLocationList =
cloudObjectStream.map(InputSplit::get)
-
.collect(Collectors.toList());
+ .collect(Collectors.toList());
verifyAll();
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
}
@@ -248,15 +265,16 @@ public class AzureInputSourceTest extends EasyMockSupport
EasyMock.expect(inputSplit.get()).andReturn(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
replayAll();
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
null
);
@@ -269,22 +287,22 @@ public class AzureInputSourceTest extends EasyMockSupport
public void test_toString_returnsExpectedString()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
null
);
-
- String actualToString = azureInputSource.toString();
+ String azureStorageAccountInputSourceString = azureInputSource.toString();
Assert.assertEquals(
- "AzureInputSource{uris=[], prefixes=[azure://container/blob],
objects=[], objectGlob=null}",
- actualToString
+ "AzureStorageAccountInputSource{uris=[],
prefixes=[azureStorage://STORAGE_ACCOUNT/CONTAINER/blob], objects=[],
objectGlob=null, azureStorageAccountInputSourceConfig=" +
azureStorageAccountInputSourceConfig + "}",
+ azureStorageAccountInputSourceString
);
}
@@ -292,28 +310,31 @@ public class AzureInputSourceTest extends EasyMockSupport
public void test_toString_withAllSystemFields_returnsExpectedString()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET,
SystemField.PATH))
);
- String actualToString = azureInputSource.toString();
+ String azureStorageAccountInputSourceString = azureInputSource.toString();
+
Assert.assertEquals(
- "AzureInputSource{"
- + "uris=[], "
- + "prefixes=[azure://container/blob], "
- + "objects=[], "
- + "objectGlob=null, "
- + "systemFields=[__file_uri, __file_bucket, __file_path]"
- + "}",
- actualToString
+ "AzureStorageAccountInputSource{"
+ + "uris=[], "
+ + "prefixes=[azureStorage://STORAGE_ACCOUNT/CONTAINER/blob], "
+ + "objects=[], "
+ + "objectGlob=null, "
+ + "azureStorageAccountInputSourceConfig=" +
azureStorageAccountInputSourceConfig + ", "
+ + "systemFields=[__file_uri, __file_bucket, __file_path]"
+ + "}",
+ azureStorageAccountInputSourceString
);
}
@@ -321,32 +342,34 @@ public class AzureInputSourceTest extends EasyMockSupport
public void test_getTypes_returnsExpectedTypes()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
- azureInputSource = new AzureInputSource(
- storage,
+ azureInputSource = new AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
null
);
- Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME),
azureInputSource.getTypes());
+
Assert.assertEquals(ImmutableSet.of(AzureStorageAccountInputSource.SCHEME),
azureInputSource.getTypes());
}
@Test
public void test_systemFields()
{
- azureInputSource = (AzureInputSource) new AzureInputSource(
- storage,
+ azureInputSource = (AzureStorageAccountInputSource) new
AzureStorageAccountInputSource(
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
+ azureAccountConfig,
EMPTY_URIS,
ImmutableList.of(PREFIX_URI),
EMPTY_OBJECTS,
null,
+ azureStorageAccountInputSourceConfig,
new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET,
SystemField.PATH))
);
@@ -356,29 +379,56 @@ public class AzureInputSourceTest extends EasyMockSupport
);
final AzureEntity entity = new AzureEntity(
- new CloudObjectLocation("foo", "bar"),
- (containerName, blobPath) -> null
+ new CloudObjectLocation("foo", "container/bar"),
+ storage,
+ AzureStorageAccountInputSource.SCHEME,
+ (containerName, blobPath, storage) -> null
);
- Assert.assertEquals("azure://foo/bar",
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
+ Assert.assertEquals("azureStorage://foo/container/bar",
azureInputSource.getSystemFieldValue(entity, SystemField.URI));
Assert.assertEquals("foo", azureInputSource.getSystemFieldValue(entity,
SystemField.BUCKET));
- Assert.assertEquals("bar", azureInputSource.getSystemFieldValue(entity,
SystemField.PATH));
+ Assert.assertEquals("container/bar",
azureInputSource.getSystemFieldValue(entity, SystemField.PATH));
}
@Test
public void abidesEqualsContract()
{
- EqualsVerifier.forClass(AzureInputSource.class)
- .usingGetClass()
- .withPrefabValues(Logger.class, new
Logger(AzureStorage.class), new Logger(AzureStorage.class))
- .withPrefabValues(AzureStorage.class, new
AzureStorage(null), new AzureStorage(null))
- .withNonnullFields("storage")
- .withNonnullFields("entityFactory")
- .withNonnullFields("azureCloudBlobIterableFactory")
- .withNonnullFields("inputDataConfig")
- .withNonnullFields("objectGlob")
- .withNonnullFields("scheme")
- .verify();
+ EqualsVerifier.forClass(AzureStorageAccountInputSource.class)
+ .usingGetClass()
+ .withPrefabValues(Logger.class, new Logger(AzureStorage.class), new
Logger(AzureStorage.class))
+ .withPrefabValues(BlobContainerClient.class, new
BlobContainerClientBuilder().buildClient(), new
BlobContainerClientBuilder().buildClient())
+ .withPrefabValues(AzureIngestClientFactory.class, new
AzureIngestClientFactory(null, null), new AzureIngestClientFactory(null, null))
+ .withIgnoredFields("entityFactory")
+ .withIgnoredFields("azureCloudBlobIterableFactory")
+ .withNonnullFields("inputDataConfig")
+ .withNonnullFields("objectGlob")
+ .withNonnullFields("scheme")
+ .withNonnullFields("azureStorageAccountInputSourceConfig")
+ .withNonnullFields("azureAccountConfig")
+ .withNonnullFields("azureIngestClientFactory")
+ .verify();
+ }
+
+ @Test
+ public void test_getContainerAndPathFromObjectLocation()
+ {
+ Pair<String, String> storageLocation =
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(
+ CLOUD_OBJECT_LOCATION_1
+ );
+ Assert.assertEquals(CONTAINER, storageLocation.lhs);
+ Assert.assertEquals(BLOB_PATH, storageLocation.rhs);
+
+ }
+
+ @Test
+ public void test_getContainerAndPathFromObjectLocatio_nullpath()
+ {
+ Pair<String, String> storageLocation =
AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(
+ new CloudObjectLocation(STORAGE_ACCOUNT, CONTAINER)
+ );
+ Assert.assertEquals(CONTAINER, storageLocation.lhs);
+ Assert.assertEquals("", storageLocation.rhs);
+
}
@After
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
index bbf07b402dd..4093719d182 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureClientFactoryTest.java
@@ -41,8 +41,7 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
azureClientFactory = new AzureClientFactory(config);
- config.setAccount(ACCOUNT);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName());
}
@@ -51,9 +50,8 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
config.setKey("key");
- config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
StorageSharedKeyCredential storageSharedKeyCredential =
StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline(
blobServiceClient.getHttpPipeline()
);
@@ -71,9 +69,8 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
config.setSharedAccessStorageToken("sasToken");
- config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
AzureSasCredentialPolicy azureSasCredentialPolicy = null;
for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount();
i++) {
if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof
AzureSasCredentialPolicy) {
@@ -89,9 +86,8 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
- config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null;
for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount();
i++) {
if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof
BearerTokenAuthenticationPolicy) {
@@ -107,10 +103,9 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
- config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
- BlobServiceClient blobServiceClient2 =
azureClientFactory.getBlobServiceClient(null);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
+ BlobServiceClient blobServiceClient2 =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
Assert.assertEquals(blobServiceClient, blobServiceClient2);
}
@@ -119,10 +114,9 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
- config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
- BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null);
- BlobServiceClient blobServiceClient2 =
azureClientFactory.getBlobServiceClient(1);
+ BlobServiceClient blobServiceClient =
azureClientFactory.getBlobServiceClient(null, ACCOUNT);
+ BlobServiceClient blobServiceClient2 =
azureClientFactory.getBlobServiceClient(1, ACCOUNT);
Assert.assertNotEquals(blobServiceClient, blobServiceClient2);
}
@@ -131,12 +125,11 @@ public class AzureClientFactoryTest
{
AzureAccountConfig config = EasyMock.createMock(AzureAccountConfig.class);
EasyMock.expect(config.getKey()).andReturn("key").times(2);
- EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2);
EasyMock.expect(config.getMaxTries()).andReturn(3);
EasyMock.expect(config.getBlobStorageEndpoint()).andReturn(AzureUtils.AZURE_STORAGE_HOST_ADDRESS);
azureClientFactory = new AzureClientFactory(config);
EasyMock.replay(config);
- azureClientFactory.getBlobServiceClient(null);
+ azureClientFactory.getBlobServiceClient(null, ACCOUNT);
EasyMock.verify(config);
}
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
index d66c3c21156..996028377ed 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
@@ -54,11 +54,16 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
private final Integer MAX_TRIES = 3;
private final Integer MAX_LISTING_LENGTH = 10;
private final String CONTAINER = "container";
+ private final String STORAGE_ACCOUNT = "storageAccount";
+ private final String DEFAULT_STORAGE_ACCOUNT = "defaultStorageAccount";
+
@Before
public void setup()
{
config.setMaxTries(MAX_TRIES);
+ config.setAccount(DEFAULT_STORAGE_ACCOUNT);
+
}
@Test
@@ -86,7 +91,108 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
- EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER,
"dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+ .andReturn(pagedIterable);
+
+ BlobItem blobPrefixItem = new
BlobItem().setIsPrefix(true).setName("subdir").setProperties(new
BlobItemProperties());
+ BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new
BlobItemProperties().setContentLength(10L));
+ SettableSupplier<PagedResponse<BlobItem>> supplier2 = new
SettableSupplier<>();
+ supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem,
blobItem2)));
+ PagedIterable<BlobItem> pagedIterable2 = new PagedIterable<>(supplier2);
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES))
+ .andReturn(pagedIterable2);
+
+ replayAll();
+ azureCloudBlobIterator = new AzureCloudBlobIterator(
+ storage,
+ config,
+ prefixes,
+ MAX_LISTING_LENGTH
+ );
+ List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
+ while (azureCloudBlobIterator.hasNext()) {
+ actualBlobItems.add(azureCloudBlobIterator.next());
+ }
+ verifyAll();
+ List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
+ new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT),
+ new CloudBlobHolder(blobItem2, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
+ );
+ Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+ );
+ }
+
+ @Test
+ public void
test_next_prefixesWithMultipleBlobsAndOneDirectory_returnsExpectedBlobs()
throws Exception
+ {
+ List<URI> prefixes = ImmutableList.of(
+ new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
+ );
+
+ BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new
BlobItemProperties().setContentLength(10L));
+ BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new
BlobItemProperties().setContentLength(10L));
+ SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
+ supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem,
blobItem2)));
+ PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+ .andReturn(pagedIterable);
+
+
+ replayAll();
+ azureCloudBlobIterator = new AzureCloudBlobIterator(
+ storage,
+ config,
+ prefixes,
+ MAX_LISTING_LENGTH
+ );
+ List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
+ while (azureCloudBlobIterator.hasNext()) {
+ actualBlobItems.add(azureCloudBlobIterator.next());
+ }
+ verifyAll();
+ List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
+ new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT),
+ new CloudBlobHolder(blobItem2, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
+ );
+ Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+ );
+ }
+
+ @Test
+ public void
test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpectedBlobs_azureStorage()
throws Exception
+ {
+ List<URI> prefixes = ImmutableList.of(
+ new URI(StringUtils.format("azureStorage://%s/%s/dir1",
STORAGE_ACCOUNT, CONTAINER)),
+ new URI(StringUtils.format("azureStorage://%s/%s/dir2",
STORAGE_ACCOUNT, CONTAINER))
+ );
+
+ BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new
BlobItemProperties().setContentLength(10L));
+ SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
+ supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
+ PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(STORAGE_ACCOUNT,
CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable);
BlobItem blobPrefixItem = new
BlobItem().setIsPrefix(true).setName("subdir").setProperties(new
BlobItemProperties());
@@ -94,7 +200,7 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
SettableSupplier<PagedResponse<BlobItem>> supplier2 = new
SettableSupplier<>();
supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem,
blobItem2)));
PagedIterable<BlobItem> pagedIterable2 = new PagedIterable<>(supplier2);
- EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER,
"dir2", MAX_LISTING_LENGTH, MAX_TRIES))
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(STORAGE_ACCOUNT,
CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable2);
replayAll();
@@ -110,13 +216,22 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
}
verifyAll();
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
- new CloudBlobHolder(blobItem, CONTAINER),
- new CloudBlobHolder(blobItem2, CONTAINER)
+ new CloudBlobHolder(blobItem, CONTAINER, STORAGE_ACCOUNT),
+ new CloudBlobHolder(blobItem2, CONTAINER, STORAGE_ACCOUNT)
);
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertEquals(
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
-
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
+
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+ );
}
@Test
@@ -132,7 +247,7 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem,
blobItem2)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
- EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER,
"dir1", MAX_LISTING_LENGTH, MAX_TRIES))
+
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(DEFAULT_STORAGE_ACCOUNT,
CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable);
replayAll();
@@ -148,12 +263,21 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
}
verifyAll();
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
- new CloudBlobHolder(blobItem, CONTAINER)
+ new CloudBlobHolder(blobItem, CONTAINER, DEFAULT_STORAGE_ACCOUNT)
);
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertEquals(
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
-
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
+
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getStorageAccount).collect(Collectors.toSet())
+ );
+ Assert.assertEquals(
+
expectedBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet()),
+
actualBlobItems.stream().map(CloudBlobHolder::getContainerName).collect(Collectors.toSet())
+ );
}
@Test(expected = NoSuchElementException.class)
@@ -176,6 +300,7 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
+ EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyInt(),
@@ -199,6 +324,7 @@ public class AzureCloudBlobIteratorTest extends
EasyMockSupport
new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
+ EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyInt(),
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
index 2c24ea23e98..55b5a7dc612 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java
@@ -184,7 +184,9 @@ public class AzureDataSegmentKillerTest extends
EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1, object2));
+ ImmutableList.of(object1, object2),
+ azureStorage
+ );
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
@@ -217,7 +219,8 @@ public class AzureDataSegmentKillerTest extends
EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1)
+ ImmutableList.of(object1),
+ azureStorage
);
EasyMock.replay(object1);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
index ac851877c53..ebcefd79571 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
@@ -62,12 +62,12 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
final InputStream zipStream = new FileInputStream(pulledFile);
final AzureAccountConfig config = new AzureAccountConfig();
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH,
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME,
BLOB_PATH)).andReturn(zipStream);
replayAll();
- AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, config);
+ AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME,
BLOB_PATH, toDir);
@@ -95,12 +95,12 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
final InputStream zipStream = new FileInputStream(pulledFile);
final AzureAccountConfig config = new AzureAccountConfig();
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH,
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME,
BLOB_PATH)).andReturn(zipStream);
replayAll();
- AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, config);
+ AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME,
BLOB_PATH_HADOOP, toDir);
@@ -125,7 +125,7 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
final File outDir = FileUtils.createTempDir();
try {
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH,
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME,
BLOB_PATH)).andThrow(
new RuntimeException(
"error"
@@ -134,7 +134,7 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
replayAll();
- AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, config);
+ AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
}
@@ -159,7 +159,7 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
HttpResponse httpResponse = createMock(HttpResponse.class);
EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes();
EasyMock.replay(httpResponse);
- EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME,
BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
+ EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH,
azureStorage)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME,
BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME,
BLOB_PATH)).andThrow(
new BlobStorageException("", httpResponse, null)
).atLeastOnce();
@@ -167,7 +167,7 @@ public class AzureDataSegmentPullerTest extends
EasyMockSupport
EasyMock.replay(azureStorage);
EasyMock.replay(byteSourceFactory);
- AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, config);
+ AzureDataSegmentPuller puller = new
AzureDataSegmentPuller(byteSourceFactory, azureStorage, config);
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
new file mode 100644
index 00000000000..d982cf2253e
--- /dev/null
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureIngestClientFactoryTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.azure.core.http.policy.AzureSasCredentialPolicy;
+import com.azure.core.http.policy.BearerTokenAuthenticationPolicy;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSourceConfig;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+@RunWith(EasyMockRunner.class)
+public class AzureIngestClientFactoryTest extends EasyMockSupport
+{
+ private AzureIngestClientFactory azureIngestClientFactory;
+ private static final String ACCOUNT = "account";
+ private static final String KEY = "key";
+ private static final String TOKEN = "token";
+
+ @Mock
+ private static AzureAccountConfig accountConfig;
+
+ @Mock
+ private static AzureStorageAccountInputSourceConfig
azureStorageAccountInputSourceConfig;
+
+ @Before
+ public void setup()
+ {
+
EasyMock.expect(accountConfig.getBlobStorageEndpoint()).andReturn("blob.core.windows.net").anyTimes();
+ }
+
+ @Test
+ public void test_blobServiceClient_accountName()
+ {
+ AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig
= new AzureStorageAccountInputSourceConfig(
+ null,
+ KEY,
+ null,
+ null,
+ null
+ );
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+ replayAll();
+ BlobServiceClient blobServiceClient =
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+ verifyAll();
+
+ Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName());
+ }
+
+ @Test
+ public void test_blobServiceClientBuilder_key() throws MalformedURLException
+ {
+ AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig
= new AzureStorageAccountInputSourceConfig(
+ null,
+ KEY,
+ null,
+ null,
+ null
+ );
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+
+ replayAll();
+ BlobServiceClient blobServiceClient =
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+ verifyAll();
+ StorageSharedKeyCredential storageSharedKeyCredential =
StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline(
+ blobServiceClient.getHttpPipeline()
+ );
+ Assert.assertNotNull(storageSharedKeyCredential);
+
+ // Azure doesn't let us look at the key in the StorageSharedKeyCredential
so make sure the authorization header generated is what we expect.
+ Assert.assertEquals(
+ new StorageSharedKeyCredential(ACCOUNT,
KEY).generateAuthorizationHeader(new URL("http://druid.com"), "POST",
ImmutableMap.of()),
+ storageSharedKeyCredential.generateAuthorizationHeader(new
URL("http://druid.com"), "POST", ImmutableMap.of())
+ );
+ }
+
+ @Test
+ public void test_blobServiceClientBuilder_sasToken()
+ {
+ AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig
= new AzureStorageAccountInputSourceConfig(
+ TOKEN,
+ null,
+ null,
+ null,
+ null
+ );
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+ replayAll();
+ BlobServiceClient blobServiceClient =
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+ verifyAll();
+
+ AzureSasCredentialPolicy azureSasCredentialPolicy = null;
+ for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount();
i++) {
+ if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof
AzureSasCredentialPolicy) {
+ azureSasCredentialPolicy = (AzureSasCredentialPolicy)
blobServiceClient.getHttpPipeline().getPolicy(i);
+ }
+ }
+
+ Assert.assertNotNull(azureSasCredentialPolicy);
+ }
+
+ @Test
+ public void test_blobServiceClientBuilder_useAppRegistration()
+ {
+ AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig
= new AzureStorageAccountInputSourceConfig(
+ null,
+ null,
+ "clientId",
+ "clientSecret",
+ "tenantId"
+ );
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+ replayAll();
+ BlobServiceClient blobServiceClient =
azureIngestClientFactory.getBlobServiceClient(3, ACCOUNT);
+ verifyAll();
+ BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null;
+ for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount();
i++) {
+ if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof
BearerTokenAuthenticationPolicy) {
+ bearerTokenAuthenticationPolicy = (BearerTokenAuthenticationPolicy)
blobServiceClient.getHttpPipeline().getPolicy(i);
+ }
+ }
+
+ Assert.assertNotNull(bearerTokenAuthenticationPolicy);
+ }
+
+
+ @Test
+ public void
test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTries()
+ {
+ // We should only call getKey twice (both times in the first call to
getBlobServiceClient)
+
EasyMock.expect(azureStorageAccountInputSourceConfig.getKey()).andReturn(KEY).times(2);
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+ EasyMock.expect(accountConfig.getMaxTries()).andReturn(5);
+ replayAll();
+ azureIngestClientFactory.getBlobServiceClient(null, ACCOUNT);
+
+ // should use the cached client and not call getKey
+ azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+
+ // should use the cached client and not call getKey
+ azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_blobServiceClientBuilder_fallbackToAzureAccountConfig()
+ {
+ AzureStorageAccountInputSourceConfig azureStorageAccountInputSourceConfig
= new AzureStorageAccountInputSourceConfig(
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ azureIngestClientFactory = new AzureIngestClientFactory(accountConfig,
azureStorageAccountInputSourceConfig);
+ EasyMock.expect(accountConfig.getKey()).andReturn(KEY).times(2);
+ replayAll();
+ azureIngestClientFactory.getBlobServiceClient(5, ACCOUNT);
+ verifyAll();
+ }
+}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
index 27d02cd2354..ba69591f6c8 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java
@@ -30,6 +30,8 @@ import com.google.inject.Module;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.apache.druid.data.input.azure.AzureEntityFactory;
+import org.apache.druid.data.input.azure.AzureInputSource;
+import org.apache.druid.data.input.azure.AzureStorageAccountInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator;
@@ -63,12 +65,13 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
private static final String AZURE_CONTAINER;
private static final String AZURE_PREFIX;
private static final int AZURE_MAX_LISTING_LENGTH;
- private static final String PATH = "path";
+ private static final String PATH = "path/subpath";
private static final Iterable<URI> EMPTY_PREFIXES_ITERABLE =
ImmutableList.of();
private static final Properties PROPERTIES;
private CloudObjectLocation cloudObjectLocation1;
private CloudObjectLocation cloudObjectLocation2;
+ private AzureStorage azureStorage;
private Injector injector;
static {
@@ -93,6 +96,7 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
{
cloudObjectLocation1 = createMock(CloudObjectLocation.class);
cloudObjectLocation2 = createMock(CloudObjectLocation.class);
+ azureStorage = createMock(AzureStorage.class);
}
@Test
@@ -143,8 +147,8 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureByteSourceFactory factory =
injector.getInstance(AzureByteSourceFactory.class);
- Object object1 = factory.create("container1", "blob1");
- Object object2 = factory.create("container2", "blob2");
+ Object object1 = factory.create("container1", "blob1", azureStorage);
+ Object object2 = factory.create("container2", "blob2", azureStorage);
Assert.assertNotNull(object1);
Assert.assertNotNull(object2);
Assert.assertNotSame(object1, object2);
@@ -155,17 +159,20 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
{
EasyMock.expect(cloudObjectLocation1.getBucket()).andReturn(AZURE_CONTAINER);
EasyMock.expect(cloudObjectLocation2.getBucket()).andReturn(AZURE_CONTAINER);
- EasyMock.expect(cloudObjectLocation1.getPath()).andReturn(PATH);
+ EasyMock.expect(cloudObjectLocation1.getPath()).andReturn(PATH).times(2);
EasyMock.expect(cloudObjectLocation2.getPath()).andReturn(PATH);
replayAll();
injector = makeInjectorWithProperties(PROPERTIES);
AzureEntityFactory factory =
injector.getInstance(AzureEntityFactory.class);
- Object object1 = factory.create(cloudObjectLocation1);
- Object object2 = factory.create(cloudObjectLocation2);
+ Object object1 = factory.create(cloudObjectLocation1, azureStorage,
AzureInputSource.SCHEME);
+ Object object2 = factory.create(cloudObjectLocation2, azureStorage,
AzureInputSource.SCHEME);
+ Object object3 = factory.create(cloudObjectLocation1, azureStorage,
AzureStorageAccountInputSource.SCHEME);
Assert.assertNotNull(object1);
Assert.assertNotNull(object2);
+ Assert.assertNotNull(object3);
Assert.assertNotSame(object1, object2);
+ Assert.assertNotSame(object1, object3);
}
@Test
@@ -173,8 +180,8 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureCloudBlobIteratorFactory factory =
injector.getInstance(AzureCloudBlobIteratorFactory.class);
- Object object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
- Object object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
+ Object object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10, azureStorage);
+ Object object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10, azureStorage);
Assert.assertNotNull(object1);
Assert.assertNotNull(object2);
Assert.assertNotSame(object1, object2);
@@ -185,8 +192,8 @@ public class AzureStorageDruidModuleTest extends
EasyMockSupport
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureCloudBlobIterableFactory factory =
injector.getInstance(AzureCloudBlobIterableFactory.class);
- AzureCloudBlobIterable object1 = factory.create(EMPTY_PREFIXES_ITERABLE,
10);
- AzureCloudBlobIterable object2 = factory.create(EMPTY_PREFIXES_ITERABLE,
10);
+ AzureCloudBlobIterable object1 = factory.create(EMPTY_PREFIXES_ITERABLE,
10, azureStorage);
+ AzureCloudBlobIterable object2 = factory.create(EMPTY_PREFIXES_ITERABLE,
10, azureStorage);
Assert.assertNotNull(object1);
Assert.assertNotNull(object2);
Assert.assertNotSame(object1, object2);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
index c31dedf2191..65acc9346a6 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -49,6 +49,7 @@ public class AzureStorageTest
BlobContainerClient blobContainerClient =
Mockito.mock(BlobContainerClient.class);
AzureClientFactory azureClientFactory =
Mockito.mock(AzureClientFactory.class);
+ private final String STORAGE_ACCOUNT = "storageAccount";
private final String CONTAINER = "container";
private final String BLOB_NAME = "blobName";
private final Integer MAX_ATTEMPTS = 3;
@@ -56,7 +57,7 @@ public class AzureStorageTest
@Before
public void setup() throws BlobStorageException
{
- azureStorage = new AzureStorage(azureClientFactory);
+ azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT);
}
@Test
@@ -71,7 +72,7 @@ public class AzureStorageTest
ArgumentMatchers.any()
);
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS,
STORAGE_ACCOUNT);
Assert.assertEquals(ImmutableList.of(BLOB_NAME),
azureStorage.listDir(CONTAINER, "", MAX_ATTEMPTS));
}
@@ -88,11 +89,35 @@ public class AzureStorageTest
ArgumentMatchers.any()
);
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
Assert.assertEquals(ImmutableList.of(BLOB_NAME),
azureStorage.listDir(CONTAINER, "", null));
}
+ @Test
+ public void testListBlobsWithPrefixInContainerSegmented() throws
BlobStorageException
+ {
+ String storageAccountCustom = "customStorageAccount";
+ BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new
BlobItemProperties().setContentLength(10L));
+ SettableSupplier<PagedResponse<BlobItem>> supplier = new
SettableSupplier<>();
+ supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
+ PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
+ Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3,
storageAccountCustom);
+
+ azureStorage.listBlobsWithPrefixInContainerSegmented(
+ storageAccountCustom,
+ CONTAINER,
+ "",
+ 1,
+ 3
+ );
+ }
+
@Test
public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
{
@@ -107,7 +132,7 @@ public class AzureStorageTest
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
-
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
+
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null,
STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
captor.capture(),
ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 3ce9d8fb3b2..92010952802 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -533,7 +533,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1, object2));
+ ImmutableList.of(object1, object2),
+ azureStorage
+ );
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
@@ -564,7 +566,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1)
+ ImmutableList.of(object1),
+ azureStorage
);
EasyMock.replay(object1);
@@ -612,7 +615,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1, object2));
+ ImmutableList.of(object1, object2),
+ azureStorage
+ );
EasyMock.replay(object1, object2);
AzureTestUtils.expectDeleteObjects(
@@ -642,7 +647,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
- ImmutableList.of(object1)
+ ImmutableList.of(object1),
+ azureStorage
);
EasyMock.replay(object1);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
index 67ec5b8e58d..e6c048dbdd2 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTestUtils.java
@@ -57,11 +57,13 @@ public class AzureTestUtils extends EasyMockSupport
AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
int maxListingLength,
URI PREFIX_URI,
- List<CloudBlobHolder> objects)
+ List<CloudBlobHolder> objects,
+ AzureStorage azureStorage
+ )
{
AzureCloudBlobIterable azureCloudBlobIterable =
EasyMock.createMock(AzureCloudBlobIterable.class);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(objects.iterator());
-
EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI),
maxListingLength)).andReturn(azureCloudBlobIterable);
+
EasyMock.expect(azureCloudBlobIterableFactory.create(ImmutableList.of(PREFIX_URI),
maxListingLength, azureStorage)).andReturn(azureCloudBlobIterable);
return azureCloudBlobIterable;
}
diff --git a/website/.spelling b/website/.spelling
index 56ad2a69217..e878cd2f9a8 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -2378,6 +2378,9 @@ markUnused
markUsed
segmentId
aggregateMultipleValues
+appRegistrationClientId
+appRegistrationClientSecret
+tenantId
relativeError
ddSketch
DDSketch
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]