This is an automated email from the ASF dual-hosted git repository.
nsabonyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f7e36a07ac NIFI-11228 Removed deprecated Azure Blob Storage Processors
f7e36a07ac is described below
commit f7e36a07acb09a6aced0149450757445b76a06fb
Author: exceptionfactory <[email protected]>
AuthorDate: Tue May 9 09:46:19 2023 -0500
NIFI-11228 Removed deprecated Azure Blob Storage Processors
This closes #7234
Signed-off-by: Nandor Soma Abonyi <[email protected]>
---
.../nifi/runtime/manifest/TestRuntimeManifest.java | 9 -
.../nifi-azure-processors/pom.xml | 12 +-
.../azure/AbstractAzureBlobProcessor.java | 116 ----------
.../azure/storage/DeleteAzureBlobStorage.java | 107 ---------
.../azure/storage/FetchAzureBlobStorage.java | 167 --------------
.../azure/storage/ListAzureBlobStorage.java | 255 ---------------------
.../azure/storage/PutAzureBlobStorage.java | 221 ------------------
.../utils/AzureBlobClientSideEncryptionUtils.java | 118 ----------
.../azure/storage/utils/AzureStorageUtils.java | 14 --
.../services/org.apache.nifi.processor.Processor | 4 -
.../azure/storage/AbstractAzureBlobStorageIT.java | 72 ------
.../azure/storage/ITAzureBlobStorageE2E.java | 245 --------------------
.../azure/storage/ITDeleteAzureBlobStorage.java | 66 ------
.../azure/storage/ITFetchAzureBlobStorage.java | 138 -----------
.../azure/storage/ITListAzureBlobStorage.java | 129 -----------
.../azure/storage/ITPutAzureBlobStorage.java | 165 -------------
.../azure/storage/TestPutAzureBlobStorage.java | 48 ----
.../storage/queue/TestPutAzureQueueStorage.java | 5 +-
.../TestAzureBlobClientSideEncryptionUtils.java | 160 -------------
...reStorageUtilsGetStorageCredentialsDetails.java | 165 -------------
...reStorageUtilsValidateCredentialProperties.java | 4 +-
21 files changed, 7 insertions(+), 2213 deletions(-)
diff --git
a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java
b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java
index d1ccd7a929..817a891596 100644
---
a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java
+++
b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java
@@ -271,15 +271,6 @@ class TestRuntimeManifest {
assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getValue());
assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getExpressionLanguageScope());
- // Verify DeleteAzureBlobStorage is deprecated
- final ProcessorDefinition deleteAzureBlobDef =
getProcessorDefinition(bundles, "nifi-azure-nar",
-
"org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage");
- assertNotNull(deleteAzureBlobDef.getDeprecated());
- assertTrue(deleteAzureBlobDef.getDeprecated().booleanValue());
- assertNotNull(deleteAzureBlobDef.getDeprecationReason());
- assertNotNull(deleteAzureBlobDef.getDeprecationAlternatives());
- assertFalse(deleteAzureBlobDef.getDeprecationAlternatives().isEmpty());
-
// Verify SplitJson has @SystemResourceConsiderations
final ProcessorDefinition splitJsonDef =
getProcessorDefinition(bundles, "nifi-standard-nar",
"org.apache.nifi.processors.standard.SplitJson");
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 197ab20e76..87888cac8c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -19,9 +19,6 @@
</parent>
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
- <properties>
- <azure-keyvault.version>1.2.6</azure-keyvault.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -96,11 +93,6 @@
<artifactId>azure-security-keyvault-keys</artifactId>
</dependency>
<!-- Legacy Microsoft Azure Libraries -->
- <dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-keyvault</artifactId>
- <version>${azure-keyvault.version}</version>
- </dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
@@ -113,6 +105,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
deleted file mode 100644
index 9e2b855cc7..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure;
-
-import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
-import com.microsoft.azure.storage.blob.BlobEncryptionPolicy;
-import com.microsoft.azure.storage.blob.BlobRequestOptions;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
-
- public static final PropertyDescriptor BLOB = new
PropertyDescriptor.Builder()
- .name("blob")
- .displayName("Blob")
- .description("The filename of the blob")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .defaultValue("${azure.blobname}")
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All successfully processed FlowFiles are routed to
this relationship")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Unsuccessful operations will be transferred to the
failure relationship.")
- .build();
-
- private static final List<PropertyDescriptor> PROPERTIES = Collections
- .unmodifiableList(Arrays.asList(
- AzureStorageUtils.CONTAINER,
- AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
- AzureStorageUtils.ACCOUNT_NAME,
- AzureStorageUtils.ACCOUNT_KEY,
- AzureStorageUtils.PROP_SAS_TOKEN,
- AzureStorageUtils.ENDPOINT_SUFFIX,
- BLOB,
- AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
-
- private static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(
- new HashSet<>(Arrays.asList(
- AbstractAzureBlobProcessor.REL_SUCCESS,
- AbstractAzureBlobProcessor.REL_FAILURE)));
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final Collection<ValidationResult> results =
AzureStorageUtils.validateCredentialProperties(validationContext);
- AzureStorageUtils.validateProxySpec(validationContext, results);
- return results;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- protected BlobRequestOptions createBlobRequestOptions(ProcessContext
context) throws DecoderException {
- final String cseKeyTypeValue =
context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue();
- final AzureBlobClientSideEncryptionMethod cseKeyType =
AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
-
- final String cseKeyId =
context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue();
-
- final String cseSymmetricKeyHex =
context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue();
-
- BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
-
- if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
- byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray());
- SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes);
- BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null);
- blobRequestOptions.setEncryptionPolicy(policy);
- }
-
- return blobRequestOptions;
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
deleted file mode 100644
index 3e4b236299..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class,
PutAzureBlobStorage.class})
-@CapabilityDescription("Deletes the provided blob from Azure Storage")
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@DeprecationNotice(alternatives = DeleteAzureBlobStorage_v12.class, reason =
"Processor depends on legacy Microsoft Azure SDK")
-public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
-
- private static final AllowableValue DELETE_SNAPSHOTS_NONE = new
AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob
only.");
-
- private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new
AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include
Snapshots", "Delete the blob and its snapshots.");
-
- private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new
AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete
Snapshots Only", "Delete only the blob's snapshots.");
-
- private static final PropertyDescriptor DELETE_SNAPSHOTS_OPTION = new
PropertyDescriptor.Builder()
- .name("delete-snapshots-option")
- .displayName("Delete Snapshots Option")
- .description("Specifies the snapshot deletion options to be used
when deleting a blob.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .allowableValues(DELETE_SNAPSHOTS_NONE, DELETE_SNAPSHOTS_ALSO,
DELETE_SNAPSHOTS_ONLY)
- .defaultValue(DELETE_SNAPSHOTS_NONE.getValue())
- .required(true)
- .build();
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.add(DELETE_SNAPSHOTS_OPTION);
- return properties;
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- FlowFile flowFile = session.get();
-
- if(flowFile == null) {
- return;
- }
-
- final long startNanos = System.nanoTime();
- final String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
- final String blobPath =
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
- final String deleteSnapshotOptions =
context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue();
-
- try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
- CloudBlob blob = container.getBlockBlobReference(blobPath);
-
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
-
blob.deleteIfExists(DeleteSnapshotsOption.valueOf(deleteSnapshotOptions), null,
null, operationContext);
- session.transfer(flowFile, REL_SUCCESS);
-
- final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().invokeRemoteProcess(flowFile,
blob.getSnapshotQualifiedUri().toString(), "Blob deleted");
- } catch ( StorageException | URISyntaxException e) {
- getLogger().error("Failed to delete the specified blob {} from
Azure Storage. Routing to failure", new Object[]{blobPath}, e);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
deleted file mode 100644
index c4567edecd..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.microsoft.azure.storage.OperationContext;
-import org.apache.commons.codec.DecoderException;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.BlobRequestOptions;
-
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing
the contents to the content of the FlowFile")
-@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class,
DeleteAzureBlobStorage.class })
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({
- @WritesAttribute(attribute = "azure.length", description = "The length of
the blob fetched")
-})
-@DeprecationNotice(alternatives = FetchAzureBlobStorage_v12.class, reason =
"Processor depends on legacy Microsoft Azure SDK")
-public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
-
- public static final PropertyDescriptor RANGE_START = new
PropertyDescriptor.Builder()
- .name("range-start")
- .displayName("Range Start")
- .description("The byte position at which to start reading from the
blob. An empty value or a value of " +
- "zero will start reading at the beginning of the blob.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(false)
- .build();
-
- public static final PropertyDescriptor RANGE_LENGTH = new
PropertyDescriptor.Builder()
- .name("range-length")
- .displayName("Range Length")
- .description("The number of bytes to download from the blob,
starting from the Range Start. An empty " +
- "value or a value that extends beyond the end of the blob
will read to the end of the blob.")
- .addValidator(StandardValidators.createDataSizeBoundsValidator(1,
Long.MAX_VALUE))
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(false)
- .build();
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
-
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
- return results;
- }
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.add(RANGE_START);
- properties.add(RANGE_LENGTH);
- properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
- properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
-
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
- return properties;
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final long startNanos = System.nanoTime();
-
- final String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
- final String blobPath =
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
- final long rangeStart = (context.getProperty(RANGE_START).isSet() ?
context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
: 0L);
- final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ?
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
: null);
-
- AtomicReference<Exception> storedException = new AtomicReference<>();
- try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
-
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
-
- final Map<String, String> attributes = new HashMap<>();
- final CloudBlob blob = container.getBlockBlobReference(blobPath);
-
- BlobRequestOptions blobRequestOptions =
createBlobRequestOptions(context);
-
- // TODO - we may be able do fancier things with ranges and
- // distribution of download over threads, investigate
- flowFile = session.write(flowFile, os -> {
- try {
- blob.downloadRange(rangeStart, rangeLength, os, null,
blobRequestOptions, operationContext);
- } catch (StorageException e) {
- storedException.set(e);
- throw new IOException(e);
- }
- });
-
- long length = blob.getProperties().getLength();
- attributes.put("azure.length", String.valueOf(length));
-
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
-
- session.transfer(flowFile, REL_SUCCESS);
- final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().fetch(flowFile,
blob.getSnapshotQualifiedUri().toString(), transferMillis);
- } catch (IllegalArgumentException | URISyntaxException |
StorageException | ProcessException | DecoderException e) {
- if (e instanceof ProcessException && storedException.get() ==
null) {
- throw (ProcessException) e;
- } else {
- Exception failureException =
Optional.ofNullable(storedException.get()).orElse(e);
- getLogger().error("Failure to fetch Azure blob {}", new
Object[]{blobPath}, failureException);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
deleted file mode 100644
index f2d0f6bbc3..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.ResultContinuation;
-import com.microsoft.azure.storage.ResultSegment;
-import com.microsoft.azure.storage.StorageUri;
-import com.microsoft.azure.storage.blob.BlobListingDetails;
-import com.microsoft.azure.storage.blob.BlobProperties;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
-import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.configuration.DefaultSchedule;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
-import org.apache.nifi.processor.util.list.ListedEntityTracker;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
-import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.util.Optional;
-
-@PrimaryNodeOnly
-@TriggerSerially
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class,
DeleteAzureBlobStorage.class })
-@CapabilityDescription("Lists blobs in an Azure Storage container. Listing
details are attached to an empty FlowFile for use with FetchAzureBlobStorage.
" +
- "This Processor is designed to run on Primary Node only in a cluster.
If the primary node changes, the new Primary Node will pick up where the " +
- "previous node left off without duplicating all of the data.")
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container",
description = "The name of the Azure container"),
- @WritesAttribute(attribute = "azure.blobname", description = "The name
of the Azure blob"),
- @WritesAttribute(attribute = "azure.primaryUri", description =
"Primary location for blob content"),
- @WritesAttribute(attribute = "azure.secondaryUri", description =
"Secondary location for blob content"),
- @WritesAttribute(attribute = "azure.etag", description = "Etag for the
Azure blob"),
- @WritesAttribute(attribute = "azure.length", description = "Length of
the blob"),
- @WritesAttribute(attribute = "azure.timestamp", description = "The
timestamp in Azure for the blob"),
- @WritesAttribute(attribute = "mime.type", description = "MimeType of
the content"),
- @WritesAttribute(attribute = "lang", description = "Language code for
the content"),
- @WritesAttribute(attribute = "azure.blobtype", description = "This is
the type of blob and can be either page or block type") })
-@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a
listing of blobs, the timestamp of the newest blob is stored. " +
- "This allows the Processor to list only blobs that have been added or
modified after this date the next time that the Processor is run. State is " +
- "stored across the cluster so that this Processor can be run on
Primary Node only and if a new Primary Node is selected, the new node can pick
up " +
- "where the previous node left off, without duplicating the data.")
-@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-@DeprecationNotice(alternatives = ListAzureBlobStorage_v12.class, reason =
"Processor depends on legacy Microsoft Azure SDK")
-public class ListAzureBlobStorage extends AbstractListAzureProcessor<BlobInfo>
{
-
- private static final PropertyDescriptor PROP_PREFIX = new
PropertyDescriptor.Builder()
- .name("prefix")
- .displayName("Prefix")
- .description("Search prefix for listing")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .required(false)
- .build();
-
- private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
- LISTING_STRATEGY,
- AbstractListProcessor.RECORD_WRITER,
- AzureStorageUtils.CONTAINER,
- AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
- AzureStorageUtils.ACCOUNT_NAME,
- AzureStorageUtils.ACCOUNT_KEY,
- AzureStorageUtils.PROP_SAS_TOKEN,
- AzureStorageUtils.ENDPOINT_SUFFIX,
- PROP_PREFIX,
- AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
- ListedEntityTracker.TRACKING_STATE_CACHE,
- ListedEntityTracker.TRACKING_TIME_WINDOW,
- ListedEntityTracker.INITIAL_LISTING_TARGET,
- MIN_AGE,
- MAX_AGE,
- MIN_SIZE,
- MAX_SIZE
- ));
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- protected void customValidate(ValidationContext validationContext,
Collection<ValidationResult> results) {
-
results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext));
- AzureStorageUtils.validateProxySpec(validationContext, results);
- }
-
- @Override
- protected Map<String, String> createAttributes(BlobInfo entity,
ProcessContext context) {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("azure.container", entity.getContainerName());
- attributes.put("azure.etag", entity.getEtag());
- attributes.put("azure.primaryUri", entity.getPrimaryUri());
- attributes.put("azure.secondaryUri", entity.getSecondaryUri());
- attributes.put("azure.blobname", entity.getBlobName());
- attributes.put("filename", entity.getName());
- attributes.put("azure.blobtype", entity.getBlobType());
- attributes.put("azure.length", String.valueOf(entity.getLength()));
- attributes.put("azure.timestamp",
String.valueOf(entity.getTimestamp()));
- attributes.put("mime.type", entity.getContentType());
- attributes.put("lang", entity.getContentLanguage());
-
- return attributes;
- }
-
- @Override
- protected String getListingContainerName(final ProcessContext context) {
- return String.format("Azure Blob Storage Container [%s]",
getPath(context));
- }
-
- @Override
- protected String getPath(final ProcessContext context) {
- return
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
- }
-
- @Override
- protected boolean isListingResetNecessary(final PropertyDescriptor
property) {
- // re-list if configuration changed, but not when security keys are
rolled (not included in the condition)
- return PROP_PREFIX.equals(property)
- || AzureStorageUtils.ACCOUNT_NAME.equals(property)
- || AzureStorageUtils.CONTAINER.equals(property)
- || AzureStorageUtils.PROP_SAS_TOKEN.equals(property);
- }
-
- @Override
- protected Scope getStateScope(final PropertyContext context) {
- return Scope.CLUSTER;
- }
-
- @Override
- protected RecordSchema getRecordSchema() {
- return BlobInfo.getRecordSchema();
- }
-
- @Override
- protected String getDefaultTimePrecision() {
- // User does not have to choose one.
- // AUTO_DETECT can handle most cases, but it may incur longer latency
- // when all listed files do not have SECOND part in their timestamps
although Azure Blob Storage does support seconds.
- return PRECISION_SECONDS.getValue();
- }
-
- @Override
- protected List<BlobInfo> performListing(final ProcessContext context,
final Long minTimestamp, final ListingMode listingMode) throws IOException {
- final String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
- final String prefix =
Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
- final List<BlobInfo> listing = new ArrayList<>();
- final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
-
- try {
- final CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
- final CloudBlobContainer container =
blobClient.getContainerReference(containerName);
-
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
-
- ResultContinuation continuationToken = null;
-
- do {
- final ResultSegment<ListBlobItem> result =
container.listBlobsSegmented(prefix, true,
EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null,
operationContext);
- continuationToken = result.getContinuationToken();
-
- for (final ListBlobItem blob : result.getResults()) {
- if (blob instanceof CloudBlob) {
- final CloudBlob cloudBlob = (CloudBlob) blob;
- final BlobProperties properties =
cloudBlob.getProperties();
-
- if (isFileInfoMatchesWithAgeAndSize(context,
minimumTimestamp, properties.getLastModified().getTime(),
properties.getLength())) {
- final StorageUri uri =
cloudBlob.getSnapshotQualifiedStorageUri();
-
- final Builder builder = new BlobInfo.Builder()
- .primaryUri(uri.getPrimaryUri().toString())
- .blobName(cloudBlob.getName())
- .containerName(containerName)
- .contentType(properties.getContentType())
-
.contentLanguage(properties.getContentLanguage())
- .etag(properties.getEtag())
-
.lastModifiedTime(properties.getLastModified().getTime())
- .length(properties.getLength());
-
- if (uri.getSecondaryUri() != null) {
-
builder.secondaryUri(uri.getSecondaryUri().toString());
- }
-
- if (blob instanceof CloudBlockBlob) {
- builder.blobType(AzureStorageUtils.BLOCK);
- } else {
- builder.blobType(AzureStorageUtils.PAGE);
- }
- listing.add(builder.build());
- }
- }
- }
- } while (continuationToken != null);
- } catch (final Throwable t) {
- throw new IOException(ExceptionUtils.getRootCause(t));
- }
- return listing;
- }
-
- // Unfiltered listing is not supported - must provide a prefix
- @Override
- protected Integer countUnfilteredListing(final ProcessContext context) {
- return null;
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
deleted file mode 100644
index 6cd779f381..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.FilterInputStream;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.microsoft.azure.storage.OperationContext;
-import org.apache.commons.codec.DecoderException;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.BlobProperties;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.BlobRequestOptions;
-
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class,
DeleteAzureBlobStorage.class })
-@CapabilityDescription("Puts content into an Azure Storage Blob")
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container",
description = "The name of the Azure container"),
- @WritesAttribute(attribute = "azure.blobname", description = "The name
of the Azure blob"),
- @WritesAttribute(attribute = "azure.primaryUri", description =
"Primary location for blob content"),
- @WritesAttribute(attribute = "azure.etag", description = "Etag for the
Azure blob"),
- @WritesAttribute(attribute = "azure.length", description = "Length of
the blob"),
- @WritesAttribute(attribute = "azure.timestamp", description = "The
timestamp in Azure for the blob")})
-@DeprecationNotice(alternatives = PutAzureBlobStorage_v12.class, reason =
"Processor depends on legacy Microsoft Azure SDK")
-public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
-
- public static final PropertyDescriptor BLOB_NAME = new
PropertyDescriptor.Builder()
- .name("blob")
- .displayName("Blob")
- .description("The filename of the blob")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(true)
- .build();
-
- public static final PropertyDescriptor CREATE_CONTAINER = new
PropertyDescriptor.Builder()
- .name("azure-create-container")
- .displayName("Create Container")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .required(true)
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .allowableValues("true", "false")
- .defaultValue("false")
- .description("Specifies whether to check if the container exists
and to automatically create it if it does not. " +
- "Permission to list containers is required. If false, this
check is not made, but the Put operation " +
- "will fail if the container does not exist.")
- .build();
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
-
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
- return results;
- }
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.remove(BLOB);
- properties.add(BLOB_NAME);
- properties.add(CREATE_CONTAINER);
- properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
- properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
-
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
- return properties;
- }
-
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final long startNanos = System.nanoTime();
-
- String containerName =
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
-
- String blobPath =
context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
- final boolean createContainer =
context.getProperty(CREATE_CONTAINER).asBoolean();
-
- AtomicReference<Exception> storedException = new AtomicReference<>();
- try {
- CloudBlobClient blobClient =
AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
- CloudBlobContainer container =
blobClient.getContainerReference(containerName);
-
- if (createContainer)
- container.createIfNotExists();
-
- CloudBlob blob = container.getBlockBlobReference(blobPath);
-
- final OperationContext operationContext = new OperationContext();
- AzureStorageUtils.setProxy(operationContext, context);
-
- BlobRequestOptions blobRequestOptions =
createBlobRequestOptions(context);
-
- final Map<String, String> attributes = new HashMap<>();
- long length = flowFile.getSize();
- session.read(flowFile, rawIn -> {
- InputStream in = rawIn;
- if (!(in instanceof BufferedInputStream)) {
- // do not double-wrap
- in = new BufferedInputStream(rawIn);
- }
-
- // If markSupported() is true and a file length is provided,
- // Blobs are not uploaded in blocks resulting in OOME for large
- // files. The UnmarkableInputStream wrapper class disables
- // mark() and reset() to help force uploading files in chunks.
- if (in.markSupported()) {
- in = new UnmarkableInputStream(in);
- }
-
- try {
- uploadBlob(blob, operationContext, blobRequestOptions, in);
- BlobProperties properties = blob.getProperties();
- attributes.put("azure.container", containerName);
- attributes.put("azure.primaryUri",
blob.getSnapshotQualifiedUri().toString());
- attributes.put("azure.etag", properties.getEtag());
- attributes.put("azure.length", String.valueOf(length));
- attributes.put("azure.timestamp",
String.valueOf(properties.getLastModified()));
- } catch (StorageException | URISyntaxException | IOException
e) {
- storedException.set(e);
- throw e instanceof IOException ? (IOException) e : new
IOException(e);
- }
- });
-
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
- session.transfer(flowFile, REL_SUCCESS);
-
- final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile,
blob.getSnapshotQualifiedUri().toString(), transferMillis);
-
- } catch (IllegalArgumentException | URISyntaxException |
StorageException | ProcessException | DecoderException e) {
- if (e instanceof ProcessException && storedException.get() ==
null) {
- throw (ProcessException) e;
- } else {
- Exception failureException =
Optional.ofNullable(storedException.get()).orElse(e);
- getLogger().error("Failed to put Azure blob {}", new
Object[]{blobPath}, failureException);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-
- }
-
- void uploadBlob(CloudBlob blob, OperationContext operationContext,
BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException,
IOException {
- blob.upload(in, -1, null, blobRequestOptions, operationContext);
- }
-
- // Used to help force Azure Blob SDK to write in blocks
- private static class UnmarkableInputStream extends FilterInputStream {
- public UnmarkableInputStream(InputStream in) {
- super(in);
- }
-
- @Override
- public void mark(int readlimit) {
- }
-
- @Override
- public void reset() throws IOException {
- }
-
- @Override
- public boolean markSupported() {
- return false;
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
deleted file mode 100644
index 6e7a2ce712..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.utils;
-
-import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-public class AzureBlobClientSideEncryptionUtils {
-
- private static final String DEFAULT_KEY_ID = "nifi";
-
- public static final PropertyDescriptor CSE_KEY_TYPE = new
PropertyDescriptor.Builder()
- .name("cse-key-type")
- .displayName("Client-Side Encryption Key Type")
- .required(true)
- .allowableValues(buildCseEncryptionMethodAllowableValues())
- .defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name())
- .description("Specifies the key type to use for client-side
encryption.")
- .build();
-
- public static final PropertyDescriptor CSE_KEY_ID = new
PropertyDescriptor.Builder()
- .name("cse-key-id")
- .displayName("Client-Side Encryption Key ID")
- .description("Specifies the ID of the key to use for client-side
encryption.")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .dependsOn(CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
- .build();
-
- public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new
PropertyDescriptor.Builder()
- .name("cse-symmetric-key-hex")
- .displayName("Symmetric Key")
- .description("When using symmetric client-side encryption, this is
the raw key, encoded in hexadecimal")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .dependsOn(CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
- .sensitive(true)
- .build();
-
- private static AllowableValue[] buildCseEncryptionMethodAllowableValues() {
- return Arrays.stream(AzureBlobClientSideEncryptionMethod.values())
- .map(v -> new AllowableValue(v.name(), v.name(),
v.getDescription()))
- .toArray(AllowableValue[]::new);
- }
-
- public static Collection<ValidationResult>
validateClientSideEncryptionProperties(ValidationContext validationContext) {
- final List<ValidationResult> validationResults = new ArrayList<>();
-
- final String cseKeyTypeValue =
validationContext.getProperty(CSE_KEY_TYPE).getValue();
- final AzureBlobClientSideEncryptionMethod cseKeyType =
AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
-
- final String cseKeyId =
validationContext.getProperty(CSE_KEY_ID).getValue();
-
- final String cseSymmetricKeyHex =
validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue();
-
- if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE &&
StringUtils.isBlank(cseKeyId)) {
- validationResults.add(new
ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
- .explanation("a key ID must be set when client-side
encryption is enabled.").build());
- }
-
- if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
- validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex));
- }
-
- return validationResults;
- }
-
- private static List<ValidationResult> validateSymmetricKey(String keyHex) {
- final List<ValidationResult> validationResults = new ArrayList<>();
- if (StringUtils.isBlank(keyHex)) {
- validationResults.add(new
ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
- .explanation("a symmetric key must not be set when
client-side encryption is enabled with symmetric encryption.").build());
- } else {
- byte[] keyBytes;
- try {
- keyBytes = Hex.decodeHex(keyHex.toCharArray());
- new SymmetricKey(DEFAULT_KEY_ID, keyBytes);
- } catch (DecoderException e) {
- validationResults.add(new
ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
- .explanation("the symmetric key must be a valid
hexadecimal string.").build());
- } catch (IllegalArgumentException e) {
- validationResults.add(new
ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
- .explanation(e.getMessage()).build());
- }
- }
-
- return validationResults;
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 5f17fbd43e..18015f560b 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -32,7 +32,6 @@ import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
@@ -41,7 +40,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -206,18 +204,6 @@ public final class AzureStorageUtils {
// do not instantiate
}
- /**
- * Create CloudBlobClient instance.
- * @param flowFile An incoming FlowFile can be used for NiFi Expression
Language evaluation to derive
- * Account Name, Account Key or SAS Token. This can be
null if not available.
- */
- public static CloudBlobClient createCloudBlobClient(ProcessContext
context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
- final AzureStorageCredentialsDetails storageCredentialsDetails =
getStorageCredentialsDetails(context, flowFile);
- final CloudStorageAccount cloudStorageAccount =
getCloudStorageAccount(storageCredentialsDetails);
- final CloudBlobClient cloudBlobClient =
cloudStorageAccount.createCloudBlobClient();
- return cloudBlobClient;
- }
-
public static CloudStorageAccount getCloudStorageAccount(final
AzureStorageCredentialsDetails storageCredentialsDetails) throws
URISyntaxException {
final CloudStorageAccount cloudStorageAccount;
if (storageCredentialsDetails instanceof
AzureStorageEmulatorCredentialsDetails) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ce7e074a0e..f9e86a4add 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,10 +15,6 @@
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub
-org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
-org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
-org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
-org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
deleted file mode 100644
index afd6a97502..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.UUID;
-
-import static
org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
-
-public abstract class AbstractAzureBlobStorageIT extends
AbstractAzureStorageIT {
-
- protected static final String TEST_CONTAINER_NAME_PREFIX =
"nifi-test-container";
- protected static final String TEST_BLOB_NAME = "nifi-test-blob";
- protected static final String TEST_FILE_NAME = "nifi-test-file";
- protected static final String TEST_FILE_CONTENT =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
-
- protected CloudBlobContainer container;
-
- @Override
- protected String getDefaultEndpointSuffix() {
- return DEFAULT_BLOB_ENDPOINT_SUFFIX;
- }
-
- @BeforeEach
- public void setUpAzureBlobStorageIT() throws Exception {
- String containerName = String.format("%s-%s",
TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
- CloudBlobClient blobClient =
getStorageAccount().createCloudBlobClient();
- container = blobClient.getContainerReference(containerName);
- container.createIfNotExists();
-
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
- }
-
- @AfterEach
- public void tearDownAzureBlobStorageIT() throws Exception {
- container.deleteIfExists();
- }
-
- protected void uploadTestBlob() throws Exception {
- uploadTestBlob(TEST_BLOB_NAME, TEST_FILE_CONTENT);
- }
-
- protected void uploadTestBlob(final String blobName, final String
fileContent) throws Exception {
- CloudBlob blob = container.getBlockBlobReference(blobName);
- byte[] buf = fileContent.getBytes(StandardCharsets.UTF_8);
- InputStream in = new ByteArrayInputStream(buf);
- blob.upload(in, buf.length);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
deleted file mode 100644
index 37c30a7ca9..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageCredentials;
-import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-
-public class ITAzureBlobStorageE2E {
-
- private static final Properties CONFIG;
-
- private static final String CREDENTIALS_FILE =
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
-
- static {
- CONFIG = new Properties();
- try {
- final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
- assertDoesNotThrow(() -> CONFIG.load(fis),
- "Could not open credentials file " + CREDENTIALS_FILE);
- FileUtils.closeQuietly(fis);
- } catch (FileNotFoundException e) {
- fail("Could not open credentials file " + CREDENTIALS_FILE + ": "
+ e.getLocalizedMessage());
- }
- }
-
- protected static String getAccountName() {
- return CONFIG.getProperty("accountName");
- }
-
- protected static String getAccountKey() {
- return CONFIG.getProperty("accountKey");
- }
-
- protected static final String TEST_CONTAINER_NAME_PREFIX =
"nifi-test-container";
- protected static final String TEST_BLOB_NAME = "nifi-test-blob";
- protected static final String TEST_FILE_CONTENT =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
-
- private static final String KEY_ID_VALUE = "key:id";
- private static final String KEY_64B_VALUE = "1234567890ABCDEF";
- private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
- private static final String KEY_192B_VALUE = KEY_128B_VALUE +
KEY_64B_VALUE;
- private static final String KEY_256B_VALUE = KEY_128B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_384B_VALUE = KEY_256B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_512B_VALUE = KEY_256B_VALUE +
KEY_256B_VALUE;
-
- protected TestRunner putRunner;
- protected TestRunner listRunner;
- protected TestRunner fetchRunner;
-
- protected CloudBlobContainer container;
-
- @BeforeEach
- public void setupRunners() throws Exception {
- putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
- listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
- fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
-
- String containerName = String.format("%s-%s",
TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
-
- StorageCredentials storageCredentials = new
StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
- CloudStorageAccount storageAccount = new
CloudStorageAccount(storageCredentials, true);
-
- CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
- container = blobClient.getContainerReference(containerName);
- container.createIfNotExists();
-
- setRunnerProperties(putRunner, containerName);
- setRunnerProperties(listRunner, containerName);
- setRunnerProperties(fetchRunner, containerName);
- }
-
- private void setRunnerProperties(TestRunner runner, String containerName) {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
- runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
- }
-
- @AfterEach
- public void tearDownAzureContainer() throws Exception {
- container.deleteIfExists();
- }
-
- @Test
- public void AzureBlobStorageE2ENoCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
- null,
- null,
- AzureBlobClientSideEncryptionMethod.NONE.name(),
- null,
- null
- );
- }
-
- @Test
- public void AzureBlobStorageE2E128BCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_128B_VALUE,
- AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_128B_VALUE
- );
- }
-
- @Test
- public void AzureBlobStorageE2E192BCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_192B_VALUE,
- AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_192B_VALUE
- );
- }
-
- @Test
- public void AzureBlobStorageE2E256BCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_256B_VALUE,
- AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_256B_VALUE
- );
- }
-
- @Test
- public void AzureBlobStorageE2E384BCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_384B_VALUE,
- AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_384B_VALUE
- );
- }
-
- @Test
- public void AzureBlobStorageE2E512BCSE() throws Exception {
- testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_512B_VALUE,
- AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_512B_VALUE
- );
- }
-
- @Test
- public void AzureBlobStorageE2E128BCSENoDecryption() {
- assertThrows(Exception.class, () ->
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
- KEY_ID_VALUE,
- KEY_128B_VALUE,
- AzureBlobClientSideEncryptionMethod.NONE.name(),
- KEY_ID_VALUE,
- KEY_128B_VALUE
- ));
- }
-
- private void testE2E(String encryptionKeyType, String encryptionKeyId,
String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId,
String decryptionKeyHex) throws Exception {
- putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
- putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
encryptionKeyType);
- if (encryptionKeyId == null || encryptionKeyId.isEmpty() ||
encryptionKeyId.trim().isEmpty()) {
-
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
- } else {
-
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
encryptionKeyId);
- }
- if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() ||
encryptionKeyHex.trim().isEmpty()) {
-
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
- } else {
-
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
encryptionKeyHex);
- }
-
- putRunner.assertValid();
- putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
- putRunner.run();
-
putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
-
-
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
-
- listRunner.assertValid();
- listRunner.run();
-
listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
-
- MockFlowFile entry =
listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
- entry.assertAttributeEquals("mime.type", "application/octet-stream");
-
-
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
decryptionKeyType);
- if (decryptionKeyId == null || decryptionKeyId.isEmpty() ||
decryptionKeyId.trim().isEmpty()) {
-
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
- } else {
-
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
decryptionKeyId);
- }
- if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() ||
decryptionKeyHex.trim().isEmpty()) {
-
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
- } else {
-
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
decryptionKeyHex);
- }
- fetchRunner.assertValid();
- fetchRunner.enqueue(entry);
- fetchRunner.run();
-
fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS,
1);
- MockFlowFile fetchedEntry =
fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
- fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
deleted file mode 100644
index 26b8fea1bf..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import org.apache.nifi.processor.Processor;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT {
-
- @Override
- protected Class<? extends Processor> getProcessorClass() {
- return DeleteAzureBlobStorage.class;
- }
-
- @BeforeEach
- public void setUp() throws Exception {
- runner.setProperty(DeleteAzureBlobStorage.BLOB, TEST_BLOB_NAME);
-
- uploadTestBlob();
- }
-
- @Test
- public void testDeleteBlob() {
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run(1);
-
- assertResult();
- }
-
- @Test
- public void testDeleteBlobUsingCredentialsService() throws Exception {
- configureCredentialsService();
-
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run(1);
-
- assertResult();
- }
-
- private void assertResult() {
-
runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
-
- Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
- assertFalse(blobs.iterator().hasNext());
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
deleted file mode 100644
index cd7e062901..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
-
- @Override
- protected Class<? extends Processor> getProcessorClass() {
- return FetchAzureBlobStorage.class;
- }
-
- @BeforeEach
- public void setUp() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.BLOB, TEST_BLOB_NAME);
-
- uploadTestBlob();
- }
-
- @Test
- public void testFetchBlob() throws Exception {
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testFetchBlobWithRangeZeroOne() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
- runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult(TEST_FILE_CONTENT.substring(0, 1));
- }
-
- @Test
- public void testFetchBlobWithRangeOneOne() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B");
- runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult(TEST_FILE_CONTENT.substring(1, 2));
- }
-
- @Test
- public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B");
- runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult(TEST_FILE_CONTENT.substring(23, 26));
- }
-
- @Test
- public void testFetchBlobWithRangeLengthGreater() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
- runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult(TEST_FILE_CONTENT);
- }
-
- @Test
- public void testFetchBlobWithRangeLengthUnset() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult(TEST_FILE_CONTENT);
- }
-
- @Test
- public void testFetchBlobWithRangeStartOutOfRange() throws Exception {
- runner.setProperty(FetchAzureBlobStorage.RANGE_START,
String.format("%sB", TEST_FILE_CONTENT.length() + 1));
- runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
-
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1);
- }
-
- @Test
- public void testFetchBlobUsingCredentialService() throws Exception {
- configureCredentialsService();
-
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run();
-
- assertResult();
- }
-
- private void assertResult() throws Exception {
- assertResult(TEST_FILE_CONTENT);
- }
-
- private void assertResult(final String expectedContent) throws Exception {
-
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
- List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals(expectedContent);
- flowFile.assertAttributeEquals("azure.length",
String.valueOf(TEST_FILE_CONTENT.length()));
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
deleted file mode 100644
index ef1670bdd9..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.StreamSupport;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
-
- @Override
- protected Class<? extends Processor> getProcessorClass() {
- return ListAzureBlobStorage.class;
- }
-
- @BeforeEach
- public void setUp() throws Exception {
- uploadTestBlob();
- waitForUpload();
- }
-
- @Test
- public void testListBlobs() throws Exception {
- runner.assertValid();
- runner.run(1);
-
- assertResult();
- }
-
- @Test
- public void testListBlobsUsingCredentialService() throws Exception {
- configureCredentialsService();
-
- runner.assertValid();
- runner.run(1);
-
- assertResult();
- }
-
- @Test
- public void testListWithMinAge() throws Exception {
- runner.setProperty(ListAzureBlobStorage.MIN_AGE, "1 hour");
-
- runner.assertValid();
- runner.run(1);
-
- runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 0);
- }
-
- @Test
- public void testListWithMaxAge() throws Exception {
- runner.setProperty(ListAzureBlobStorage.MAX_AGE, "1 hour");
-
- runner.assertValid();
- runner.run(1);
-
- assertResult(TEST_FILE_CONTENT);
- }
-
- @Test
- public void testListWithMinSize() throws Exception {
- uploadTestBlob("nifi-test-blob2", "Test");
- waitForUpload();
- assertListCount();
- runner.setProperty(ListAzureBlobStorage.MIN_SIZE, "5 B");
-
- runner.assertValid();
- runner.run(1);
-
- assertResult(TEST_FILE_CONTENT);
- }
-
- @Test
- public void testListWithMaxSize() throws Exception {
- uploadTestBlob("nifi-test-blob2", "Test");
- waitForUpload();
- assertListCount();
- runner.setProperty(ListAzureBlobStorage.MAX_SIZE, "5 B");
-
- runner.assertValid();
- runner.run(1);
-
- assertResult("Test");
- }
-
- private void waitForUpload() throws InterruptedException {
-
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
- }
-
- private void assertResult() {
- assertResult(TEST_FILE_CONTENT);
- }
-
- private void assertResult(final String content) {
- runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
- runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS,
1);
-
- for (MockFlowFile entry :
runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
- entry.assertAttributeEquals("azure.length",
String.valueOf(content.getBytes(StandardCharsets.UTF_8).length));
- entry.assertAttributeEquals("mime.type",
"application/octet-stream");
- }
- }
-
- private void assertListCount() {
- final long listCount =
StreamSupport.stream(container.listBlobs().spliterator(), false).count();
- assertEquals(2, listCount, "There should be 2 uploaded files but found
only " + listCount);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
deleted file mode 100644
index 212b48d8b1..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import org.apache.nifi.processor.Processor;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
-import
org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
-
- public static final String TEST_FILE_CONTENT = "0123456789";
- private static final String KEY_ID_VALUE = "key:id";
- private static final String KEY_64B_VALUE = "1234567890ABCDEF";
- private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
- private static final String KEY_192B_VALUE = KEY_128B_VALUE +
KEY_64B_VALUE;
- private static final String KEY_256B_VALUE = KEY_128B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_384B_VALUE = KEY_256B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_512B_VALUE = KEY_256B_VALUE +
KEY_256B_VALUE;
-
-
- @Override
- protected Class<? extends Processor> getProcessorClass() {
- return PutAzureBlobStorage.class;
- }
-
- @BeforeEach
- public void setUp() {
- runner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
- }
-
- @Test
- public void testPutBlob() throws Exception {
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlob64BSymmetricCSE() {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_64B_VALUE);
- runner.assertNotValid();
- }
-
- @Test
- public void testPutBlob128BSymmetricCSE() throws Exception {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_128B_VALUE);
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlob192BSymmetricCSE() throws Exception {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_192B_VALUE);
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlob256BSymmetricCSE() throws Exception {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_256B_VALUE);
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlob384BSymmetricCSE() throws Exception {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_384B_VALUE);
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlob512BSymmetricCSE() throws Exception {
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
- runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
KEY_ID_VALUE);
-
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
KEY_512B_VALUE);
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testPutBlobUsingCredentialsService() throws Exception {
- configureCredentialsService();
-
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- assertResult();
- }
-
- @Test
- public void testInvalidCredentialsRoutesToFailure() {
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY,
"aW52YWxpZGludmFsaWQ=");
- runner.assertValid();
- runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- runner.run();
-
- runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
- }
-
- private void assertResult() throws Exception {
- runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS,
1);
- List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
-
flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
- flowFile.assertAttributeEquals("azure.length", "10");
- }
-
- Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
- assertTrue(blobs.iterator().hasNext());
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
deleted file mode 100644
index c139b04460..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-
-public class TestPutAzureBlobStorage {
-
- @Test
- public void testIOExceptionDuringUploadTransfersToFailure() throws
Exception {
- PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
- doThrow(IOException.class).when(processor).uploadBlob(any(), any(),
any(), any());
-
- TestRunner runner = TestRunners.newTestRunner(processor);
- runner.setProperty(PutAzureBlobStorage.BLOB, "test");
- runner.setProperty(AzureStorageUtils.CONTAINER, "test");
- runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test");
- runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test");
-
- runner.enqueue("test data");
- runner.run();
-
- runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
index f54cb08fae..68f2fcd76b 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
-import com.microsoft.azure.storage.StorageException;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@@ -25,8 +24,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -39,7 +36,7 @@ public class TestPutAzureQueueStorage {
private final TestRunner runner =
TestRunners.newTestRunner(PutAzureQueueStorage.class);
@Test
- public void testInvalidTTLAndVisibilityDelay() throws StorageException,
URISyntaxException, InvalidKeyException {
+ public void testInvalidTTLAndVisibilityDelay() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
deleted file mode 100644
index 5c338f226f..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.utils;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.MockValidationContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collection;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestAzureBlobClientSideEncryptionUtils {
- private static final String KEY_ID_VALUE = "key:id";
- private static final String KEY_64B_VALUE = "1234567890ABCDEF";
- private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
- private static final String KEY_192B_VALUE = KEY_128B_VALUE +
KEY_64B_VALUE;
- private static final String KEY_256B_VALUE = KEY_128B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_384B_VALUE = KEY_256B_VALUE +
KEY_128B_VALUE;
- private static final String KEY_512B_VALUE = KEY_256B_VALUE +
KEY_256B_VALUE;
-
- private MockProcessContext processContext;
- private MockValidationContext validationContext;
-
- @BeforeEach
- public void setUp() {
- Processor processor = new PutAzureBlobStorage();
- processContext = new MockProcessContext(processor);
- validationContext = new MockValidationContext(processContext);
- }
-
- @Test
- public void testNoCesConfiguredOnProcessor() {
- configureProcessorProperties("NONE", null,null);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- @Test
- public void testSymmetricCesNoKeyIdOnProcessor() {
- configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertNotValid(result);
- }
-
- @Test
- public void testSymmetricCesNoKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertNotValid(result);
- }
-
- @Test
- public void testSymmetricCesInvalidHexKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ");
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertNotValid(result);
- }
-
- @Test
- public void testSymmetricCes64BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertNotValid(result);
- }
-
- @Test
- public void testSymmetricCes128BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,
KEY_128B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- @Test
- public void testSymmetricCes192BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,
KEY_192B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- @Test
- public void testSymmetricCes256BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,
KEY_256B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- @Test
- public void testSymmetricCes384BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,
KEY_384B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- @Test
- public void testSymmetricCes512BitKeyOnProcessor() {
- configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,
KEY_512B_VALUE);
-
- Collection<ValidationResult> result =
AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
-
- assertValid(result);
- }
-
- private void configureProcessorProperties(String keyType, String keyId,
String symmetricKeyHex) {
- if (keyType != null) {
-
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE,
keyType);
- }
- if (keyId != null) {
-
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID,
keyId);
- }
- if (symmetricKeyHex != null) {
-
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX,
symmetricKeyHex);
- }
- }
-
- private void assertValid(Collection<ValidationResult> result) {
- assertTrue(result.isEmpty(), "There should be no validation error");
- }
-
- private void assertNotValid(Collection<ValidationResult> result) {
- assertFalse(result.isEmpty(), "There should be validation error");
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
deleted file mode 100644
index 56b973a2a3..0000000000
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage.utils;
-
-import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
-import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
-import com.microsoft.azure.storage.core.Base64;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
-import
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
-import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
-import org.apache.nifi.util.MockConfigurationContext;
-import org.apache.nifi.util.MockProcessContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestAzureStorageUtilsGetStorageCredentialsDetails {
-
- private static final String CREDENTIALS_SERVICE_VALUE =
"CredentialsService";
- private static final String ACCOUNT_NAME_VALUE = "AccountName";
- private static final String ACCOUNT_KEY_VALUE =
Base64.encode("AccountKey".getBytes());
- private static final String SAS_TOKEN_VALUE = "SasToken";
-
- private MockProcessContext processContext;
-
- @BeforeEach
- public void setUp() {
- Processor processor = new ListAzureBlobStorage();
- processContext = new MockProcessContext(processor);
- }
-
- @Test
- public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
- configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE,
null);
-
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
-
-
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
- }
-
- @Test
- public void testAccountNameAndSasTokenConfiguredOnProcessor() {
- configureProcessorProperties(ACCOUNT_NAME_VALUE, null,
SAS_TOKEN_VALUE);
-
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
-
-
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
- }
-
- @Test
- public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
- configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE,
null);
-
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
-
-
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
- }
-
- @Test
- public void testAccountNameAndSasTokenConfiguredOnControllerService() {
- configureControllerService(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
-
- AzureStorageCredentialsDetails storageCredentialsDetails =
AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
-
-
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
- }
-
- @Test
- public void testAccountNameMissingConfiguredOnProcessor() {
- configureProcessorProperties(null, ACCOUNT_KEY_VALUE, null);
-
- assertThrows(IllegalArgumentException.class, () ->
AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
- }
-
- @Test
- public void testAccountKeyAndSasTokenMissingConfiguredOnProcessor() {
- configureProcessorProperties(ACCOUNT_NAME_VALUE, null, null);
-
- assertThrows(IllegalArgumentException.class, () ->
AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
- }
-
- @Test
- public void testAccountNameMissingConfiguredOnControllerService() {
- configureControllerService(null, ACCOUNT_KEY_VALUE, null);
-
- assertThrows(IllegalArgumentException.class, () ->
AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
- }
-
- @Test
- public void
testAccountKeyAndSasTokenMissingConfiguredOnControllerService() {
- configureControllerService(ACCOUNT_NAME_VALUE, null, null);
-
- assertThrows(IllegalArgumentException.class, () ->
AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
- }
-
- private void configureProcessorProperties(String accountName, String
accountKey, String sasToken) {
- if (accountName != null) {
- processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME,
accountName);
- }
- if (accountKey != null) {
- processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY,
accountKey);
- }
- if (sasToken != null) {
- processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN,
sasToken);
- }
- }
-
- private void configureControllerService(String accountName, String
accountKey, String sasToken) {
- AzureStorageCredentialsControllerService credentialsService = new
AzureStorageCredentialsControllerService();
-
- Map<PropertyDescriptor, String> properties = new HashMap<>();
- if (accountName != null) {
- properties.put(AzureStorageUtils.ACCOUNT_NAME, accountName);
- }
- if (accountKey != null) {
- properties.put(AzureStorageUtils.ACCOUNT_KEY, accountKey);
- }
- if (sasToken != null) {
- properties.put(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
- }
-
- MockConfigurationContext configurationContext = new
MockConfigurationContext(properties, null);
- credentialsService.onEnabled(configurationContext);
-
- processContext.addControllerService(credentialsService,
CREDENTIALS_SERVICE_VALUE);
-
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
CREDENTIALS_SERVICE_VALUE);
- }
-
- private void
assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails
storageCredentialsDetails) {
- assertEquals(ACCOUNT_NAME_VALUE,
storageCredentialsDetails.getStorageAccountName());
- assertTrue(storageCredentialsDetails.getStorageCredentials()
instanceof StorageCredentialsAccountAndKey);
- StorageCredentialsAccountAndKey storageCredentials =
(StorageCredentialsAccountAndKey)
storageCredentialsDetails.getStorageCredentials();
- assertEquals(ACCOUNT_NAME_VALUE, storageCredentials.getAccountName());
- assertEquals(ACCOUNT_KEY_VALUE,
storageCredentials.exportBase64EncodedKey());
- }
-
- private void
assertStorageCredentialsDetailsAccountNameAndSasToken(AzureStorageCredentialsDetails
storageCredentialsDetails) {
- assertEquals(ACCOUNT_NAME_VALUE,
storageCredentialsDetails.getStorageAccountName());
- assertTrue(storageCredentialsDetails.getStorageCredentials()
instanceof StorageCredentialsSharedAccessSignature);
- StorageCredentialsSharedAccessSignature storageCredentials =
(StorageCredentialsSharedAccessSignature)
storageCredentialsDetails.getStorageCredentials();
- assertEquals(SAS_TOKEN_VALUE, storageCredentials.getToken());
- }
-}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
index be956eb2c8..ac88c2fdeb 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
+import org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +41,7 @@ public class
TestAzureStorageUtilsValidateCredentialProperties {
@BeforeEach
public void setUp() {
- Processor processor = new ListAzureBlobStorage();
+ Processor processor = new GetAzureQueueStorage();
processContext = new MockProcessContext(processor);
validationContext = new MockValidationContext(processContext);
}