turcsanyip commented on a change in pull request #4430:
URL: https://github.com/apache/nifi/pull/4430#discussion_r462330562
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService.java
##########
@@ -77,16 +78,37 @@
final String accountKey =
validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue();
final String sasToken =
validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue();
- if (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)) {
+ final Boolean accountKeyIsSet =
validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).isSet();
+ final Boolean sasTokenIsSet =
validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).isSet();
+
+ if (StringUtils.isAllBlank(accountKey, sasToken)) {
results.add(new
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
.valid(false)
.explanation("either " +
AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " or " +
AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " is required")
.build());
- } else if (StringUtils.isNotBlank(accountKey) &&
StringUtils.isNotBlank(sasToken)) {
- results.add(new
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+ }
+
+ if (BooleanUtils.xor(new Boolean[] { accountKeyIsSet, sasTokenIsSet
})) {
+ if (accountKeyIsSet) {
+ if (StringUtils.isBlank(accountKey)) {
+ results.add(new
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
.valid(false)
- .explanation("cannot set both " +
AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " and " +
AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
+
.explanation(AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " must be set
when using Account Key authentication.")
.build());
+ }
+ } else if (sasTokenIsSet) {
+ if (StringUtils.isBlank(sasToken)) {
+ results.add(new
ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
+ .valid(false)
+
.explanation(AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " must be set
when using SAS token authentication.")
+ .build());
+ }
Review comment:
I believe these two cases should be handled via
`StandardValidators.NON_BLANK_VALIDATOR` directly on the properties which would
be less complicated / more straightforward.
Nevertheless, it is a good point to check blank values and the current
`NON_EMPTY_VALIDATOR` is not the best option.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String
accountName, String accountKey, S
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
CREDENTIALS_SERVICE_VALUE);
}
- private void
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
storageCredentialsDetails) {
+ private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+ AzureStorageCredentialsDetails storageCredentialsDetails) {
+
assertEquals(ACCOUNT_NAME_VALUE,
storageCredentialsDetails.getStorageAccountName());
- assertTrue(storageCredentialsDetails.getStorageCredentials()
instanceof StorageCredentialsAccountAndKey);
- StorageCredentialsAccountAndKey storageCredentials =
(StorageCredentialsAccountAndKey)
storageCredentialsDetails.getStorageCredentials();
+ assertTrue(storageCredentialsDetails.getCredentialType() ==
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);
+ assertTrue(storageCredentialsDetails.getStorageSharedKeyCredential()
instanceof StorageSharedKeyCredential);
+
+ // test credential object
+ StorageSharedKeyCredential storageCredentials =
(StorageSharedKeyCredential) storageCredentialsDetails
Review comment:
Unnecessary type cast.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
protected List<BlobInfo> performListing(final ProcessContext context,
final Long minTimestamp) throws IOException {
String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix =
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
- if (prefix == null) {
- prefix = "";
- }
+
final List<BlobInfo> listing = new ArrayList<>();
try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
+ BlobServiceClient blobServiceClient =
AzureStorageUtils.createBlobServiceClient(context, null);
+ BlobContainerClient blobContainerClient =
blobServiceClient.getBlobContainerClient(containerName);
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
+ final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+ .setPrefix(prefix)
+ .setDetails(new BlobListDetails()
+ .setRetrieveMetadata(true));
- for (ListBlobItem blob : container.listBlobs(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
- if (blob instanceof CloudBlob) {
- CloudBlob cloudBlob = (CloudBlob) blob;
- BlobProperties properties = cloudBlob.getProperties();
- StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
+ blobContainerClient.listBlobs().forEach(blob -> {
+ if (blob instanceof BlobItem) {
+ BlobItem blobItem = (BlobItem) blob;
Review comment:
Unnecessary type check / type cast. `listBlobs()` returns
`PagedIterable<BlobItem>`.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP,
ProxySpec.SOCKS};
+
+ private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final
Proxy.Type proxyType) {
+ for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+ if (item.toProxyType() == proxyType) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration
+ .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static HttpClient createHttpClient(final PropertyContext
propertyContext) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(propertyContext);
+ final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+ final HttpClient client = new NettyAsyncHttpClientBuilder()
+ .proxy(proxyOptions)
+ .build();
+
+ return client;
+ }
+
+ public static void validateProxySpec(final ValidationContext context,
final Collection<ValidationResult> results) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
+
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort ==
null)
+ || (StringUtils.isBlank(proxyServerHost) && proxyServerPort !=
null)) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+ .explanation(
+ "When specifying address information, both `host`
and `port` information must be provided.")
Review comment:
We use "normal" apostrophes (') in UI messages instead of (`).
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -180,21 +198,27 @@ public static AzureStorageCredentialsDetails
createStorageCredentialsDetails(Pro
final String accountKey =
context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken =
context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
+ AzureStorageCredentialsDetails azureStorageCredentialsDetails;
+
if (StringUtils.isBlank(accountName)) {
throw new IllegalArgumentException(String.format("'%s' must not be
empty.", ACCOUNT_NAME.getDisplayName()));
}
- StorageCredentials storageCredentials;
+ if (StringUtils.isAllBlank(accountKey, sasToken)) {
+ throw new IllegalArgumentException(String.format("Either '%s' or
'%s' must be defined.", ACCOUNT_KEY.getDisplayName(),
+
PROP_SAS_TOKEN.getDisplayName()));
Review comment:
It is checked in the `else` branch below. Seems to be duplicated here.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -180,21 +198,27 @@ public static AzureStorageCredentialsDetails
createStorageCredentialsDetails(Pro
final String accountKey =
context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken =
context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
+ AzureStorageCredentialsDetails azureStorageCredentialsDetails;
+
if (StringUtils.isBlank(accountName)) {
throw new IllegalArgumentException(String.format("'%s' must not be
empty.", ACCOUNT_NAME.getDisplayName()));
}
- StorageCredentials storageCredentials;
+ if (StringUtils.isAllBlank(accountKey, sasToken)) {
+ throw new IllegalArgumentException(String.format("Either '%s' or
'%s' must be defined.", ACCOUNT_KEY.getDisplayName(),
+
PROP_SAS_TOKEN.getDisplayName()));
+ }
- if (StringUtils.isNotBlank(accountKey)) {
- storageCredentials = new
StorageCredentialsAccountAndKey(accountName, accountKey);
+ if(StringUtils.isNotBlank(accountKey)) {
Review comment:
Minor: `if (`
Please use a formatter (default settings of IntelliJ / Eclipse should be
fine.
##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
##########
@@ -77,8 +76,19 @@
<version>${azure-eventhubs-eph.version}</version>
</dependency>
<dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-storage</artifactId>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-queue</artifactId>
+ <version>${azure-storage-queue.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
Review comment:
Could you please define it in the parent pom's `dependencyManagement`
section? (similar to the blob dependency)
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
}
/**
- * Create CloudBlobClient instance.
+ * Create BlobServiceClient instance.
* @param flowFile An incoming FlowFile can be used for NiFi Expression
Language evaluation to derive
* Account Name, Account Key or SAS Token. This can be
null if not available.
*/
- public static CloudBlobClient createCloudBlobClient(ProcessContext
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+ public static BlobServiceClient createBlobServiceClient(PropertyContext
context, FlowFile flowFile) {
final AzureStorageCredentialsDetails storageCredentialsDetails =
getStorageCredentialsDetails(context, flowFile);
- final CloudStorageAccount cloudStorageAccount = new
CloudStorageAccount(
- storageCredentialsDetails.getStorageCredentials(),
- true,
- storageCredentialsDetails.getStorageSuffix(),
- storageCredentialsDetails.getStorageAccountName());
- final CloudBlobClient cloudBlobClient =
cloudStorageAccount.createCloudBlobClient();
-
- return cloudBlobClient;
+
+ final String storageSuffix =
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+ ? storageCredentialsDetails.getStorageSuffix()
+ : "blob.core.windows.net";
+ final String endpoint = String.format("https://%s.%s",
storageCredentialsDetails.getStorageAccountName(),
+ storageSuffix);
+
+ // use HttpClient object to allow proxy setting
+ final HttpClient httpClient =
AzureProxyUtils.createHttpClient(context);
+ final BlobServiceClientBuilder blobServiceClientBuilder = new
BlobServiceClientBuilder()
+
.endpoint(endpoint)
+
.httpClient(httpClient);
+ BlobServiceClient blobServiceClient;
+
+ switch(storageCredentialsDetails.getCredentialType()) {
+ case SAS_TOKEN:
+ blobServiceClient =
blobServiceClientBuilder.sasToken(storageCredentialsDetails.getSasToken())
+ .buildClient();
+ break;
+ case STORAGE_ACCOUNT_KEY:
+ blobServiceClient =
blobServiceClientBuilder.credential(storageCredentialsDetails.getStorageSharedKeyCredential())
+ .buildClient();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Invalid
credential type '%s'!",
storageCredentialsDetails.getCredentialType().toString()));
+ }
+
+ return blobServiceClient;
}
public static AzureStorageCredentialsDetails
getStorageCredentialsDetails(PropertyContext context, FlowFile flowFile) {
- final Map<String, String> attributes = flowFile != null ?
flowFile.getAttributes() : Collections.emptyMap();
+ final Map<String, String> attributes = flowFile != null
+ ? flowFile.getAttributes()
+ : Collections.emptyMap();
Review comment:
Please try to avoid reformatting existing code (supposed it was
formatted properly).
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
}
/**
- * Create CloudBlobClient instance.
+ * Create BlobServiceClient instance.
* @param flowFile An incoming FlowFile can be used for NiFi Expression
Language evaluation to derive
* Account Name, Account Key or SAS Token. This can be
null if not available.
*/
- public static CloudBlobClient createCloudBlobClient(ProcessContext
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+ public static BlobServiceClient createBlobServiceClient(PropertyContext
context, FlowFile flowFile) {
final AzureStorageCredentialsDetails storageCredentialsDetails =
getStorageCredentialsDetails(context, flowFile);
- final CloudStorageAccount cloudStorageAccount = new
CloudStorageAccount(
- storageCredentialsDetails.getStorageCredentials(),
- true,
- storageCredentialsDetails.getStorageSuffix(),
- storageCredentialsDetails.getStorageAccountName());
- final CloudBlobClient cloudBlobClient =
cloudStorageAccount.createCloudBlobClient();
-
- return cloudBlobClient;
+
+ final String storageSuffix =
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+ ? storageCredentialsDetails.getStorageSuffix()
+ : "blob.core.windows.net";
+ final String endpoint = String.format("https://%s.%s",
storageCredentialsDetails.getStorageAccountName(),
+ storageSuffix);
+
+ // use HttpClient object to allow proxy setting
+ final HttpClient httpClient =
AzureProxyUtils.createHttpClient(context);
+ final BlobServiceClientBuilder blobServiceClientBuilder = new
BlobServiceClientBuilder()
+
.endpoint(endpoint)
+
.httpClient(httpClient);
+ BlobServiceClient blobServiceClient;
+
+ switch(storageCredentialsDetails.getCredentialType()) {
Review comment:
Minor: `switch (`
##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
##########
@@ -25,13 +25,12 @@
<dependencies>
<dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-storage</artifactId>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
Review comment:
`azure-storage-common` dependency would be sufficient in the services
api module.
Now it is coming as the transitive dependency of `azure-storage-blob` but
the blob itself is not needed here.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP,
ProxySpec.SOCKS};
+
+ private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final
Proxy.Type proxyType) {
+ for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+ if (item.toProxyType() == proxyType) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration
+ .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static HttpClient createHttpClient(final PropertyContext
propertyContext) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(propertyContext);
+ final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+ final HttpClient client = new NettyAsyncHttpClientBuilder()
+ .proxy(proxyOptions)
+ .build();
+
+ return client;
+ }
+
+ public static void validateProxySpec(final ValidationContext context,
final Collection<ValidationResult> results) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
+
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort ==
null)
+ || (StringUtils.isBlank(proxyServerHost) && proxyServerPort !=
null)) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+ .explanation(
+ "When specifying address information, both `host`
and `port` information must be provided.")
+ .build());
+ }
+
+ if ((StringUtils.isBlank(proxyServerUser) &&
StringUtils.isNotBlank(proxyServerPassword))
+ || (StringUtils.isNotBlank(proxyServerUser) &&
StringUtils.isBlank(proxyServerPassword))) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+ .explanation(
+ "When specifying credentials, both `user` and
`password` must be provided.")
+ .build());
+ }
+
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
+
+ public static ProxyOptions getProxyOptions(final ProxyConfiguration
proxyConfig) {
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ final Boolean proxyServerProvided =
StringUtils.isNotBlank(proxyServerHost) && proxyServerPort != null;
+ final Boolean proxyCredentialsProvided =
StringUtils.isNotBlank(proxyServerUser) &&
StringUtils.isNotBlank(proxyServerPassword);
Review comment:
Variables can be primitive `boolean`.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
protected List<BlobInfo> performListing(final ProcessContext context,
final Long minTimestamp) throws IOException {
String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix =
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
- if (prefix == null) {
- prefix = "";
- }
+
final List<BlobInfo> listing = new ArrayList<>();
try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
+ BlobServiceClient blobServiceClient =
AzureStorageUtils.createBlobServiceClient(context, null);
+ BlobContainerClient blobContainerClient =
blobServiceClient.getBlobContainerClient(containerName);
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
+ final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+ .setPrefix(prefix)
+ .setDetails(new BlobListDetails()
+ .setRetrieveMetadata(true));
- for (ListBlobItem blob : container.listBlobs(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
- if (blob instanceof CloudBlob) {
- CloudBlob cloudBlob = (CloudBlob) blob;
- BlobProperties properties = cloudBlob.getProperties();
- StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
+ blobContainerClient.listBlobs().forEach(blob -> {
+ if (blob instanceof BlobItem) {
+ BlobItem blobItem = (BlobItem) blob;
+ BlobItemProperties properties = blobItem.getProperties();
+ BlobClient blobClient =
blobContainerClient.getBlobClient(blobItem.getName());
+ String uri = blobClient.getBlobUrl();
Builder builder = new BlobInfo.Builder()
-
.primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
+ .primaryUri(uri)
+ .blobName(blobItem.getName())
+
.blobType(properties.getBlobType().toString())
Review comment:
Minor backward incompatibility issue: `Block` (orig) vs `BlockBlob` (new)
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
protected List<BlobInfo> performListing(final ProcessContext context,
final Long minTimestamp) throws IOException {
String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix =
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
- if (prefix == null) {
- prefix = "";
- }
+
final List<BlobInfo> listing = new ArrayList<>();
try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
+ BlobServiceClient blobServiceClient =
AzureStorageUtils.createBlobServiceClient(context, null);
+ BlobContainerClient blobContainerClient =
blobServiceClient.getBlobContainerClient(containerName);
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
+ final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+ .setPrefix(prefix)
+ .setDetails(new BlobListDetails()
+ .setRetrieveMetadata(true));
- for (ListBlobItem blob : container.listBlobs(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
- if (blob instanceof CloudBlob) {
- CloudBlob cloudBlob = (CloudBlob) blob;
- BlobProperties properties = cloudBlob.getProperties();
- StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
+ blobContainerClient.listBlobs().forEach(blob -> {
+ if (blob instanceof BlobItem) {
+ BlobItem blobItem = (BlobItem) blob;
+ BlobItemProperties properties = blobItem.getProperties();
+ BlobClient blobClient =
blobContainerClient.getBlobClient(blobItem.getName());
+ String uri = blobClient.getBlobUrl();
Builder builder = new BlobInfo.Builder()
-
.primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
+ .primaryUri(uri)
Review comment:
The same url encoding issue as in case of the Put processor.
It also affects a `filename` attribute of the outgoing FlowFile which would
be calculated as:
`primaryUri.substring(primaryUri.lastIndexOf('/') + 1);`
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -37,17 +28,22 @@
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
public final class AzureStorageUtils {
+
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
+ public static final String APPEND = "Append";
Review comment:
What is the point in these constants?
The existing ones are not used either (as far as I can see).
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
##########
@@ -146,24 +142,46 @@ private AzureStorageUtils() {
}
/**
- * Create CloudBlobClient instance.
+ * Create BlobServiceClient instance.
* @param flowFile An incoming FlowFile can be used for NiFi Expression
Language evaluation to derive
* Account Name, Account Key or SAS Token. This can be
null if not available.
*/
- public static CloudBlobClient createCloudBlobClient(ProcessContext
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
+ public static BlobServiceClient createBlobServiceClient(PropertyContext
context, FlowFile flowFile) {
final AzureStorageCredentialsDetails storageCredentialsDetails =
getStorageCredentialsDetails(context, flowFile);
- final CloudStorageAccount cloudStorageAccount = new
CloudStorageAccount(
- storageCredentialsDetails.getStorageCredentials(),
- true,
- storageCredentialsDetails.getStorageSuffix(),
- storageCredentialsDetails.getStorageAccountName());
- final CloudBlobClient cloudBlobClient =
cloudStorageAccount.createCloudBlobClient();
-
- return cloudBlobClient;
+
+ final String storageSuffix =
StringUtils.isNotBlank(storageCredentialsDetails.getStorageSuffix())
+ ? storageCredentialsDetails.getStorageSuffix()
+ : "blob.core.windows.net";
Review comment:
Does it work for Queue Storage as well?
The "official" suffix would be `queue.core.windows.net` for queues.
With my storage account I get back different IPs with nslookup but they
might point to the same place behind the scenes...
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String
accountName, String accountKey, S
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
CREDENTIALS_SERVICE_VALUE);
}
- private void
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
storageCredentialsDetails) {
+ private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+ AzureStorageCredentialsDetails storageCredentialsDetails) {
+
assertEquals(ACCOUNT_NAME_VALUE,
storageCredentialsDetails.getStorageAccountName());
- assertTrue(storageCredentialsDetails.getStorageCredentials()
instanceof StorageCredentialsAccountAndKey);
- StorageCredentialsAccountAndKey storageCredentials =
(StorageCredentialsAccountAndKey)
storageCredentialsDetails.getStorageCredentials();
+ assertTrue(storageCredentialsDetails.getCredentialType() ==
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);
+ assertTrue(storageCredentialsDetails.getStorageSharedKeyCredential()
instanceof StorageSharedKeyCredential);
Review comment:
It is always true, the assert has no real effect.
Also in `assertStorageCredentialsDetailsAccountNameAndSasToken`.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
##########
@@ -48,20 +48,17 @@
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
- private static final AllowableValue DELETE_SNAPSHOTS_NONE = new
AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob
only.");
-
- private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new
AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include
Snapshots", "Delete the blob and its snapshots.");
+ private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new
AllowableValue(DeleteSnapshotsOptionType.INCLUDE.name(), "Include Snapshots",
"Delete the blob and its snapshots.");
- private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new
AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete
Snapshots Only", "Delete only the blob's snapshots.");
+ private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new
AllowableValue(DeleteSnapshotsOptionType.ONLY.name(), "Delete Snapshots Only",
"Delete only the blob's snapshots.");
Review comment:
This is a backward incompatible change in terms of existing flows.
We need to keep the `NONE` option.
We also need to keep the original names for `AllowableValue`-s and translate
them to the new enum values in `onTrigger()`.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
##########
@@ -176,45 +174,38 @@ protected String getDefaultTimePrecision() {
protected List<BlobInfo> performListing(final ProcessContext context,
final Long minTimestamp) throws IOException {
String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix =
context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
- if (prefix == null) {
- prefix = "";
- }
+
final List<BlobInfo> listing = new ArrayList<>();
try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
+ BlobServiceClient blobServiceClient =
AzureStorageUtils.createBlobServiceClient(context, null);
+ BlobContainerClient blobContainerClient =
blobServiceClient.getBlobContainerClient(containerName);
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
+ final ListBlobsOptions listBlobsOptions = new ListBlobsOptions()
+ .setPrefix(prefix)
+ .setDetails(new BlobListDetails()
+ .setRetrieveMetadata(true));
- for (ListBlobItem blob : container.listBlobs(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
- if (blob instanceof CloudBlob) {
- CloudBlob cloudBlob = (CloudBlob) blob;
- BlobProperties properties = cloudBlob.getProperties();
- StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
+ blobContainerClient.listBlobs().forEach(blob -> {
+ if (blob instanceof BlobItem) {
+ BlobItem blobItem = (BlobItem) blob;
+ BlobItemProperties properties = blobItem.getProperties();
+ BlobClient blobClient =
blobContainerClient.getBlobClient(blobItem.getName());
+ String uri = blobClient.getBlobUrl();
Builder builder = new BlobInfo.Builder()
-
.primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
+ .primaryUri(uri)
+ .blobName(blobItem.getName())
+
.blobType(properties.getBlobType().toString())
+
.containerName(blobClient.getContainerName())
.contentType(properties.getContentType())
.contentLanguage(properties.getContentLanguage())
- .etag(properties.getEtag())
-
.lastModifiedTime(properties.getLastModified().getTime())
- .length(properties.getLength());
-
- if (uri.getSecondaryUri() != null) {
- builder.secondaryUri(uri.getSecondaryUri().toString());
- }
-
- if (blob instanceof CloudBlockBlob) {
- builder.blobType(AzureStorageUtils.BLOCK);
- } else {
- builder.blobType(AzureStorageUtils.PAGE);
- }
+ .etag(properties.getETag())
+
.lastModifiedTime(properties.getLastModified().toEpochSecond())
+
.length(properties.getContentLength());
+
Review comment:
Minor backward incompatibility: secondaryUri is missing.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
##########
@@ -114,14 +112,14 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
try {
- blob.upload(in, length, null, null, operationContext);
+ blob.upload(in, length);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
- attributes.put("azure.primaryUri",
blob.getSnapshotQualifiedUri().toString());
- attributes.put("azure.etag", properties.getEtag());
+ attributes.put("azure.primaryUri", blob.getBlobUrl());
Review comment:
`BlobClient.getBlobUrl()` returns the slashes (/) in the blob name in
url encoded form.
Eg.:
`https://mystorageaccount.blob.core.windows.net/my-blob-container/dir1%2Ffile1`
It was earlier (and I think it still should be):
`https://mystorageaccount.blob.core.windows.net/my-blob-container/dir1/file1`
It seems to me a bug in the new client lib.
It affects the List processor too.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -147,18 +154,40 @@ private void configureControllerService(String
accountName, String accountKey, S
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
CREDENTIALS_SERVICE_VALUE);
}
- private void
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
storageCredentialsDetails) {
+ private void assertStorageCredentialsDetailsAccountNameAndAccountKey(
+ AzureStorageCredentialsDetails storageCredentialsDetails) {
+
assertEquals(ACCOUNT_NAME_VALUE,
storageCredentialsDetails.getStorageAccountName());
- assertTrue(storageCredentialsDetails.getStorageCredentials()
instanceof StorageCredentialsAccountAndKey);
- StorageCredentialsAccountAndKey storageCredentials =
(StorageCredentialsAccountAndKey)
storageCredentialsDetails.getStorageCredentials();
+ assertTrue(storageCredentialsDetails.getCredentialType() ==
AzureStorageCredentialsDetails.CredentialType.STORAGE_ACCOUNT_KEY);
Review comment:
`assertSame` should be used
Also in `assertStorageCredentialsDetailsAccountNameAndSasToken`
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureProxyUtilsGetProxyOptions.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.ProxyOptions.Type;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureProxyUtilsGetProxyOptions {
+
+ private MockProcessContext processContext;
+ private ProxyConfiguration proxyConfig;
+
+ private static final String PROXY_CONFIG_SERVICE_VALUE =
"ProxyConfigurationService";
+ private static final String PROXY_HOST = "localhost";
+ private static final Integer PROXY_PORT = 9000;
+ private static final String PROXY_USER = "microsoft";
+ private static final String PROXY_PASSWORD = "azure";
+
+ @Before
+ public void setUp() {
+ final Processor processor = new ListAzureBlobStorage();
+ processContext = new MockProcessContext(processor);
+ }
+
+ private void configureMockedHTTPProxyService(String proxyHost, Integer
proxyPort, String proxyUser, String proxyUserPassword) {
+ proxyConfig = new ProxyConfiguration();
+ proxyConfig.setProxyType(Proxy.Type.HTTP);
+
+ if(StringUtils.isNotBlank(proxyHost)) {
+ proxyConfig.setProxyServerHost(proxyHost);
+ }
+ if(proxyPort != null) {
+ proxyConfig.setProxyServerPort(proxyPort);
+ }
+ if(StringUtils.isNotBlank(proxyUser)) {
+ proxyConfig.setProxyUserName(proxyUser);
+ }
+ if(StringUtils.isNotBlank(proxyUserPassword)) {
+ proxyConfig.setProxyUserPassword(proxyUserPassword);
+ }
+
+ MockProxyConfigurationService mockProxyConfigurationService = new
MockProxyConfigurationService(proxyConfig);
+ // set mocked proxy service
+ processContext.addControllerService(mockProxyConfigurationService,
PROXY_CONFIG_SERVICE_VALUE);
+
processContext.setProperty(AzureProxyUtils.PROXY_CONFIGURATION_SERVICE,
PROXY_CONFIG_SERVICE_VALUE);
+ }
+
+ @Test
+ public void testHTTPProxy() {
+ configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, null, null);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+ final InetSocketAddress socketAddress = new
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+ assertEquals(proxyOptions.getAddress(), socketAddress);
+ assertEquals(proxyOptions.getType(), Type.HTTP);
Review comment:
The expected value should be the 1st argument.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
##########
@@ -54,25 +57,27 @@ public void setUp() {
public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE,
null);
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils
+ .getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
}
@Test
public void testAccountNameAndSasTokenConfiguredOnProcessor() {
configureProcessorProperties(ACCOUNT_NAME_VALUE, null,
SAS_TOKEN_VALUE);
-
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils
+ .getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
}
@Test
public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
- configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE,
null);
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
+ configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE,
null);
+ AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils
Review comment:
Please do not reformat the existing code.
The max. line length is 200 in checkstyle config.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureProxyUtilsGetProxyOptions.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.ProxyOptions.Type;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAzureProxyUtilsGetProxyOptions {
+
+ private MockProcessContext processContext;
+ private ProxyConfiguration proxyConfig;
+
+ private static final String PROXY_CONFIG_SERVICE_VALUE =
"ProxyConfigurationService";
+ private static final String PROXY_HOST = "localhost";
+ private static final Integer PROXY_PORT = 9000;
+ private static final String PROXY_USER = "microsoft";
+ private static final String PROXY_PASSWORD = "azure";
+
+ @Before
+ public void setUp() {
+ final Processor processor = new ListAzureBlobStorage();
+ processContext = new MockProcessContext(processor);
+ }
+
+ private void configureMockedHTTPProxyService(String proxyHost, Integer
proxyPort, String proxyUser, String proxyUserPassword) {
+ proxyConfig = new ProxyConfiguration();
+ proxyConfig.setProxyType(Proxy.Type.HTTP);
+
+ if(StringUtils.isNotBlank(proxyHost)) {
+ proxyConfig.setProxyServerHost(proxyHost);
+ }
+ if(proxyPort != null) {
+ proxyConfig.setProxyServerPort(proxyPort);
+ }
+ if(StringUtils.isNotBlank(proxyUser)) {
+ proxyConfig.setProxyUserName(proxyUser);
+ }
+ if(StringUtils.isNotBlank(proxyUserPassword)) {
+ proxyConfig.setProxyUserPassword(proxyUserPassword);
+ }
+
+ MockProxyConfigurationService mockProxyConfigurationService = new
MockProxyConfigurationService(proxyConfig);
+ // set mocked proxy service
+ processContext.addControllerService(mockProxyConfigurationService,
PROXY_CONFIG_SERVICE_VALUE);
+
processContext.setProperty(AzureProxyUtils.PROXY_CONFIGURATION_SERVICE,
PROXY_CONFIG_SERVICE_VALUE);
+ }
+
+ @Test
+ public void testHTTPProxy() {
+ configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, null, null);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+ final InetSocketAddress socketAddress = new
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+ assertEquals(proxyOptions.getAddress(), socketAddress);
+ assertEquals(proxyOptions.getType(), Type.HTTP);
+ // null asserts
+ assertNull(proxyOptions.getUsername());
+ assertNull(proxyOptions.getPassword());
+ }
+
+ @Test
+ public void testHTTPProxyWithAuth() {
+ configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, PROXY_USER,
PROXY_PASSWORD);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+ final InetSocketAddress socketAddress = new
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+ assertEquals(proxyOptions.getAddress(), socketAddress);
+ assertEquals(proxyOptions.getType(), Type.HTTP);
+ assertEquals(proxyOptions.getUsername(), PROXY_USER);
+ assertEquals(proxyOptions.getPassword(), PROXY_PASSWORD);
+ }
+
+ @Test
+ public void testHTTPProxyWithOnlyUser() {
+ configureMockedHTTPProxyService(PROXY_HOST, PROXY_PORT, PROXY_USER,
null);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+ final InetSocketAddress socketAddress = new
InetSocketAddress(PROXY_HOST, PROXY_PORT);
+
+ assertEquals(proxyOptions.getAddress(), socketAddress);
+ assertEquals(proxyOptions.getType(), Type.HTTP);
+ // null asserts
+ assertNull(proxyOptions.getUsername());
+ assertNull(proxyOptions.getPassword());
+ }
+
+ @Test
+ public void testHTTPProxyWithOnlyProxyPort() {
+ configureMockedHTTPProxyService(null, PROXY_PORT, null, null);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+
+ // null asserts
+ assertNull(proxyOptions);
+ }
+
+ @Test
+ public void testHTTPProxyWithoutInput() {
+ configureMockedHTTPProxyService(null, null, null, null);
+ final MockProxyConfigurationService mockProxyConfigurationService =
processContext.getProperty(
+
AzureProxyUtils.PROXY_CONFIGURATION_SERVICE).asControllerService(MockProxyConfigurationService.class);
+
+ final ProxyOptions proxyOptions =
AzureProxyUtils.getProxyOptions(mockProxyConfigurationService.getConfiguration());
+
+ // null asserts
+ assertNull(proxyOptions);
+ }
+
+ private class MockProxyConfigurationService extends
AbstractControllerService implements ProxyConfigurationService {
Review comment:
The nested class could be static.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
##########
@@ -82,21 +79,23 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
final long startNanos = System.nanoTime();
final String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final String blobPath =
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
- final String deleteSnapshotOptions =
context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue();
+ final String deleteSnapshotOption =
context.getProperty(DELETE_SNAPSHOTS_OPTION).isSet()
Review comment:
It seems `deleteSnapshotsOption` would be the right name.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP,
ProxySpec.SOCKS};
+
+ private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final
Proxy.Type proxyType) {
+ for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+ if (item.toProxyType() == proxyType) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration
+ .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static HttpClient createHttpClient(final PropertyContext
propertyContext) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(propertyContext);
+ final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+ final HttpClient client = new NettyAsyncHttpClientBuilder()
+ .proxy(proxyOptions)
+ .build();
+
+ return client;
+ }
+
+ public static void validateProxySpec(final ValidationContext context,
final Collection<ValidationResult> results) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
+
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort ==
null)
+ || (StringUtils.isBlank(proxyServerHost) && proxyServerPort !=
null)) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
Review comment:
"Proxy Configuration" would be a more understandable subject for the end
user I think.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP,
ProxySpec.SOCKS};
+
+ private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final
Proxy.Type proxyType) {
+ for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+ if (item.toProxyType() == proxyType) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration
+ .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static HttpClient createHttpClient(final PropertyContext
propertyContext) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(propertyContext);
+ final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+ final HttpClient client = new NettyAsyncHttpClientBuilder()
+ .proxy(proxyOptions)
+ .build();
+
+ return client;
+ }
+
+ public static void validateProxySpec(final ValidationContext context,
final Collection<ValidationResult> results) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
+
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort ==
null)
Review comment:
These checks are relevant in case of `HTTP` and `SOCKS` proxy but not
for `DIRECT`.
In case of `DIRECT` (that is no proxy), it could be checked that no other
properties are specified.
##########
File path:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureProxyUtils.java
##########
@@ -0,0 +1,115 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Collection;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.util.StringUtils;
+
+public class AzureProxyUtils {
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP,
ProxySpec.SOCKS};
+
+ private static ProxyOptions.Type getProxyOptionsTypeFromProxyType(final
Proxy.Type proxyType) {
+ for (final ProxyOptions.Type item : ProxyOptions.Type.values()) {
+ if (item.toProxyType() == proxyType) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE =
ProxyConfiguration
+ .createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static HttpClient createHttpClient(final PropertyContext
propertyContext) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(propertyContext);
+ final ProxyOptions proxyOptions = getProxyOptions(proxyConfig);
+
+ final HttpClient client = new NettyAsyncHttpClientBuilder()
+ .proxy(proxyOptions)
+ .build();
+
+ return client;
+ }
+
+ public static void validateProxySpec(final ValidationContext context,
final Collection<ValidationResult> results) {
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
+
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ if ((StringUtils.isNotBlank(proxyServerHost) && proxyServerPort ==
null)
+ || (StringUtils.isBlank(proxyServerHost) && proxyServerPort !=
null)) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+ .explanation(
+ "When specifying address information, both `host`
and `port` information must be provided.")
+ .build());
+ }
+
+ if ((StringUtils.isBlank(proxyServerUser) &&
StringUtils.isNotBlank(proxyServerPassword))
+ || (StringUtils.isNotBlank(proxyServerUser) &&
StringUtils.isBlank(proxyServerPassword))) {
+ results.add(new
ValidationResult.Builder().subject("AzureProxyUtils Details").valid(false)
+ .explanation(
+ "When specifying credentials, both `user` and
`password` must be provided.")
+ .build());
+ }
+
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
+
+ public static ProxyOptions getProxyOptions(final ProxyConfiguration
proxyConfig) {
+ final String proxyServerHost = proxyConfig.getProxyServerHost();
+ final Integer proxyServerPort = proxyConfig.getProxyServerPort();
+ final String proxyServerUser = proxyConfig.getProxyUserName();
+ final String proxyServerPassword = proxyConfig.getProxyUserPassword();
+
+ final Boolean proxyServerProvided =
StringUtils.isNotBlank(proxyServerHost) && proxyServerPort != null;
+ final Boolean proxyCredentialsProvided =
StringUtils.isNotBlank(proxyServerUser) &&
StringUtils.isNotBlank(proxyServerPassword);
+
+ // if no endpoint is provided, return zero
+ if (!proxyServerProvided) {
+ return null;
+ }
+
+ // translate Proxy.Type to ProxyOptions.Type
+ final ProxyOptions.Type proxyType =
getProxyOptionsTypeFromProxyType(proxyConfig.getProxyType());
+ final InetSocketAddress socketAddress = new
InetSocketAddress(proxyServerHost, proxyServerPort);
+
+ final ProxyOptions proxyOptions = new ProxyOptions(proxyType,
socketAddress);
+
+ if (proxyCredentialsProvided) {
+ return proxyOptions.setCredentials(proxyServerUser,
proxyServerPassword);
+ } else {
+ return proxyOptions;
+ }
Review comment:
```
if (proxyCredentialsProvided) {
proxyOptions.setCredentials(proxyServerUser,
proxyServerPassword);
}
return proxyOptions;
```
would be more readable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]