NIFI-4199: Added ProxyConfigurationService - Added ProxyConfigurationService to manage centralized proxy configurations - Adopt ProxyConfigurationService at FTP and HTTP processors
NIFI-4175 - Add HTTP proxy support to *SFTP processors This closes #2018. Signed-off-by: Koji Kawamura <[email protected]> NIFI-4199: Add ProxyConfigurationService to SFTP processors - Fixed check style issue - Use the same proxy related PropertyDescriptors from FTPTransfer and SFTPTransfer - Dropped FlowFile EL evaluation support to make it align with other processors spec, Now it supports VARIABLE_REGISTRY - Added ProxyConfigurationService to SFTP processors - Added SOCKS proxy support to SFTP processors NIFI-4199: Added ProxyConfigurationService to ElasticsearchHttp processors - ElasticsearchHttp processors now support SOCKS proxy, too - Added proxy support to PutElasticsearchHttpRecord - Moved more common property descriptors to AbstractElasticsearchHttpProcessor and just return static unmodifiable property descriptor list at each implementation processors NIFI-4196 - Expose AWS proxy authentication settings NIFI-4196 - Fix jUnit errors This closes #2016. Signed-off-by: Koji Kawamura <[email protected]> NIFI-4199: Add ProxyConfigService to AWS processors - Applied ProxyConfigService to S3 processors - Added proxy support to following processors: - PutKinesisFirehose, PutKinesisStream - PutDynamoDB, DeleteDynamoDB, GetDynamoDB - PutKinesisStream - All AWS processors support HTTP proxy now NIFI-4199: Add ProxyConfigService to Azure processors NIFI-4199: More explicit validation and docs for Proxy spec - Each processor has different supporting Proxy specs - Show supported spec to ProxyConfigurationService property doc - Validate not only Proxy type, but also with Authentication NIFI-4199: Incorporated review comments - Fixed TestListS3 property descriptor check - Separate name and displayName This closes #2016 This closes #2018 This closes #2704 Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2834fa4c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2834fa4c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2834fa4c Branch: refs/heads/master Commit: 2834fa4ce477014dfad8c96b6d326d0f1f06a23e Parents: d79216d Author: Koji Kawamura <[email protected]> Authored: Tue May 15 14:52:39 2018 +0900 Committer: Mike Thomsen <[email protected]> Committed: Sun May 20 17:57:47 2018 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 6 + .../nifi-aws-abstract-processors/pom.xml | 4 + .../processors/aws/AbstractAWSProcessor.java | 57 ++++- .../aws/cloudwatch/PutCloudWatchMetric.java | 2 +- .../processors/aws/dynamodb/DeleteDynamoDB.java | 3 +- .../processors/aws/dynamodb/GetDynamoDB.java | 3 +- .../processors/aws/dynamodb/PutDynamoDB.java | 3 +- .../kinesis/firehose/PutKinesisFirehose.java | 2 +- .../aws/kinesis/stream/PutKinesisStream.java | 2 +- .../nifi/processors/aws/lambda/PutLambda.java | 4 +- .../nifi/processors/aws/s3/DeleteS3Object.java | 2 +- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../apache/nifi/processors/aws/s3/ListS3.java | 3 +- .../nifi/processors/aws/s3/PutS3Object.java | 2 +- .../apache/nifi/processors/aws/sns/PutSNS.java | 2 +- .../processors/aws/s3/TestDeleteS3Object.java | 8 +- .../processors/aws/s3/TestFetchS3Object.java | 9 +- .../nifi/processors/aws/s3/TestListS3.java | 11 +- .../nifi/processors/aws/s3/TestPutS3Object.java | 8 +- .../nifi-azure-processors/pom.xml | 4 + .../azure/AbstractAzureBlobProcessor.java | 7 +- .../azure/storage/DeleteAzureBlobStorage.java | 6 +- .../azure/storage/FetchAzureBlobStorage.java | 6 +- .../azure/storage/ListAzureBlobStorage.java | 13 +- .../azure/storage/PutAzureBlobStorage.java | 6 +- .../storage/queue/GetAzureQueueStorage.java | 11 +- .../storage/queue/PutAzureQueueStorage.java | 11 +- .../azure/storage/utils/AzureStorageUtils.java | 16 ++ .../nifi-elasticsearch-processors/pom.xml | 4 + .../AbstractElasticsearchHttpProcessor.java | 75 ++++--- .../elasticsearch/FetchElasticsearchHttp.java | 12 +- .../elasticsearch/PutElasticsearchHttp.java | 12 +- .../PutElasticsearchHttpRecord.java | 8 +- .../elasticsearch/QueryElasticsearchHttp.java | 12 +- .../elasticsearch/ScrollElasticsearchHttp.java | 12 +- .../nifi-standard-processors/pom.xml | 4 + .../nifi/processors/standard/FetchFTP.java | 11 + .../nifi/processors/standard/FetchSFTP.java | 17 ++ .../apache/nifi/processors/standard/GetFTP.java | 11 + .../nifi/processors/standard/GetHTTP.java | 36 +--- .../nifi/processors/standard/GetSFTP.java | 9 + .../nifi/processors/standard/InvokeHTTP.java | 56 ++--- .../nifi/processors/standard/ListFTP.java | 11 + .../nifi/processors/standard/ListSFTP.java | 17 ++ .../nifi/processors/standard/PostHTTP.java | 38 +--- .../apache/nifi/processors/standard/PutFTP.java | 11 + .../nifi/processors/standard/PutSFTP.java | 16 ++ .../processors/standard/util/FTPTransfer.java | 43 +++- .../nifi/processors/standard/util/FTPUtils.java | 1 + .../processors/standard/util/HTTPUtils.java | 74 +++++++ .../processors/standard/util/SFTPTransfer.java | 38 ++++ .../nifi-proxy-configuration-api/pom.xml | 43 ++++ .../apache/nifi/proxy/ProxyConfiguration.java | 207 +++++++++++++++++++ .../nifi/proxy/ProxyConfigurationService.java | 44 ++++ .../java/org/apache/nifi/proxy/ProxySpec.java | 43 ++++ .../nifi/proxy/TestProxyConfiguration.java | 166 +++++++++++++++ .../nifi-proxy-configuration-nar/pom.xml | 42 ++++ .../nifi-proxy-configuration/pom.xml | 45 ++++ .../StandardProxyConfigurationService.java | 128 ++++++++++++ ...org.apache.nifi.controller.ControllerService | 16 ++ .../nifi-proxy-configuration-bundle/pom.xml | 34 +++ .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 2 + nifi-nar-bundles/pom.xml | 5 + 64 files changed, 1326 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 344e30c..e610aa0 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -703,6 +703,12 @@ language governing permissions and limitations under the License. --> <version>1.7.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-proxy-configuration-nar</artifactId> + <version>1.7.0-SNAPSHOT</version> + <type>nar</type> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml index caa95dc..e41a921 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml @@ -76,6 +76,10 @@ <artifactId>nifi-utils</artifactId> <version>1.7.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-proxy-configuration-api</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index a978ef4..f0be384 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -28,6 +28,7 @@ import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import java.io.File; import java.io.IOException; +import java.net.Proxy; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,6 +54,8 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors; import org.apache.nifi.processors.aws.regions.AWSRegions; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.ssl.SSLContextService; /** @@ -93,6 +96,25 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder() + .name("proxy-user-name") + .displayName("Proxy Username") + .description("Proxy username") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("proxy-user-password") + .displayName("Proxy Password") + .description("Proxy password") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() .name("Region") .required(true) @@ -131,6 +153,9 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; protected static final String DEFAULT_USER_AGENT = "NiFi"; + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH}; + public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); + private static AllowableValue createAllowableValue(final Regions region) { return new AllowableValue(region.getName(), AWSRegions.getRegionDisplayName(region.getName())); } @@ -169,6 +194,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build()); } + ProxyConfiguration.validateProxySpec(validationContext, problems, PROXY_SPECS); + return problems; } @@ -193,11 +220,31 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl } } - if (context.getProperty(PROXY_HOST).isSet()) { - String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); - config.setProxyHost(proxyHost); - Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger(); - config.setProxyPort(proxyPort); + final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> { + if (context.getProperty(PROXY_HOST).isSet()) { + final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); + String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); + Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger(); + String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue(); + String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); + componentProxyConfig.setProxyType(Proxy.Type.HTTP); + componentProxyConfig.setProxyServerHost(proxyHost); + componentProxyConfig.setProxyServerPort(proxyPort); + componentProxyConfig.setProxyUserName(proxyUsername); + componentProxyConfig.setProxyUserPassword(proxyPassword); + return componentProxyConfig; + } + return ProxyConfiguration.DIRECT_CONFIGURATION; + }); + + if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) { + config.setProxyHost(proxyConfig.getProxyServerHost()); + config.setProxyPort(proxyConfig.getProxyServerPort()); + + if (proxyConfig.hasCredential()) { + config.setProxyUsername(proxyConfig.getProxyUserName()); + config.setProxyPassword(proxyConfig.getProxyUserPassword()); + } } return config; http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java index be84330..f0273e5 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java @@ -174,7 +174,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor Collections.unmodifiableList( Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP, UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, - TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) + TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD) ); private volatile Set<String> dynamicPropertyNames = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java index ee614a6..3c9f73b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java @@ -74,7 +74,8 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, - CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, + PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java index 73c9f9a..328dae6 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java @@ -80,7 +80,8 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, - CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, + PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") .description("FlowFiles are routed to not found relationship if key not found in the table").build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java index 08a4b23..92e552a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java @@ -84,7 +84,8 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE, - REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, + PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); /** * Dyamodb max item size limit 400 kb http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index 8c8856d..8110a9a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -72,7 +72,7 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, - PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); + PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE)); /** * Max buffer size 1 MB http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java index 14bdf82..13aedfe 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -80,7 +80,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, - AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); + AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE)); /** A random number generator for cases where partition key is not available */ protected Random randomParitionKeyGenerator = new Random(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java index 6daf19b..b14836d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java @@ -127,8 +127,8 @@ public class PutLambda extends AbstractAWSLambdaProcessor { public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000; public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT - )); + Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index f2de794..7d2cce9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -59,7 +59,7 @@ public class DeleteS3Object extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 61a3d06..122231a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -76,7 +76,7 @@ public class FetchS3Object extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index d988d16..0fd5ab7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -137,7 +137,8 @@ public class ListS3 extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); + SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, + DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); public static final Set<Relationship> relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 3dc3199..f856b85 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -209,7 +209,7 @@ public class PutS3Object extends AbstractS3Processor { Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, - SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT)); + SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index 4a93243..2acc38b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -77,7 +77,7 @@ public class PutSNS extends AbstractSNSProcessor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, - USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT)); + USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); public static final int MAX_SIZE = 256 * 1024; http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java index 381046c..2bde4e1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -141,7 +142,7 @@ public class TestDeleteS3Object { public void testGetPropertyDescriptors() throws Exception { DeleteS3Object processor = new DeleteS3Object(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 20, pd.size()); + assertEquals("size should be eq", 23, pd.size()); assertTrue(pd.contains(processor.ACCESS_KEY)); assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(processor.BUCKET)); @@ -160,5 +161,10 @@ public class TestDeleteS3Object { assertTrue(pd.contains(processor.VERSION_ID)); assertTrue(pd.contains(processor.WRITE_ACL_LIST)); assertTrue(pd.contains(processor.WRITE_USER_LIST)); + assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(processor.PROXY_HOST)); + assertTrue(pd.contains(processor.PROXY_HOST_PORT)); + assertTrue(pd.contains(processor.PROXY_USERNAME)); + assertTrue(pd.contains(processor.PROXY_PASSWORD)); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 1ebf79b..bcfff23 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -178,7 +179,7 @@ public class TestFetchS3Object { public void testGetPropertyDescriptors() throws Exception { FetchS3Object processor = new FetchS3Object(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 14, pd.size()); + assertEquals("size should be eq", 17, pd.size()); assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(FetchS3Object.BUCKET)); @@ -191,5 +192,11 @@ public class TestFetchS3Object { assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(FetchS3Object.TIMEOUT)); assertTrue(pd.contains(FetchS3Object.VERSION_ID)); + assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(FetchS3Object.PROXY_HOST)); + assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT)); + assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME)); + assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD)); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index f5ce291..48210b9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.commons.lang3.time.DateUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -294,7 +295,7 @@ public class TestListS3 { public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 17, pd.size()); + assertEquals("size should be eq", 20, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET)); @@ -305,11 +306,15 @@ public class TestListS3 { assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE)); assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(ListS3.TIMEOUT)); - assertTrue(pd.contains(ListS3.PROXY_HOST)); - assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.PREFIX)); assertTrue(pd.contains(ListS3.USE_VERSIONS)); + assertTrue(pd.contains(ListS3.LIST_TYPE)); assertTrue(pd.contains(ListS3.MIN_AGE)); + assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(ListS3.PROXY_HOST)); + assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); + assertTrue(pd.contains(ListS3.PROXY_USERNAME)); + assertTrue(pd.contains(ListS3.PROXY_PASSWORD)); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 0ee7792..747da00 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -25,6 +25,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -150,7 +151,7 @@ public class TestPutS3Object { public void testGetPropertyDescriptors() throws Exception { PutS3Object processor = new PutS3Object(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 28, pd.size()); + assertEquals("size should be eq", 31, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -172,6 +173,11 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); + assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(PutS3Object.PROXY_HOST)); + assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT)); + assertTrue(pd.contains(PutS3Object.PROXY_USERNAME)); + assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD)); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 1bca21a..d031bf5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -41,6 +41,10 @@ <artifactId>nifi-record</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-proxy-configuration-api</artifactId> + </dependency> + <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-eventhubs</artifactId> <version>${azure-eventhubs.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index 5b2d8ce..2156b56 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -59,7 +59,8 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, - BLOB)); + BLOB, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( new HashSet<>(Arrays.asList( @@ -73,7 +74,9 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - return AzureStorageUtils.validateCredentialProperties(validationContext); + final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext); + AzureStorageUtils.validateProxySpec(validationContext, results); + return results; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java index 2819f99..a3f66d8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.storage; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; @@ -58,7 +59,10 @@ public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlob blob = container.getBlockBlobReference(blobPath); - blob.deleteIfExists(); + + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + blob.deleteIfExists(null, null, null, operationContext); session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index b9bbf44..2300cea 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.microsoft.azure.storage.OperationContext; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -69,6 +70,9 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); CloudBlobContainer container = blobClient.getContainerReference(containerName); + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + final Map<String, String> attributes = new HashMap<>(); final CloudBlob blob = container.getBlockBlobReference(blobPath); @@ -76,7 +80,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { // distribution of download over threads, investigate flowFile = session.write(flowFile, os -> { try { - blob.download(os); + blob.download(os, null, null, operationContext); } catch (StorageException e) { storedException.set(e); throw new IOException(e); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index fd27958..d9df136 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.storage; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; @@ -93,7 +94,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, - PROP_PREFIX)); + PROP_PREFIX, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -102,7 +104,9 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - return AzureStorageUtils.validateCredentialProperties(validationContext); + final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext); + AzureStorageUtils.validateProxySpec(validationContext, results); + return results; } @Override @@ -162,7 +166,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null); CloudBlobContainer container = blobClient.getContainerReference(containerName); - for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + + for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) { if (blob instanceof CloudBlob) { CloudBlob cloudBlob = (CloudBlob) blob; BlobProperties properties = cloudBlob.getProperties(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index abcb2ee..f42e4b3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.microsoft.azure.storage.OperationContext; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -77,6 +78,9 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { CloudBlob blob = container.getBlockBlobReference(blobPath); + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + final Map<String, String> attributes = new HashMap<>(); long length = flowFile.getSize(); session.read(flowFile, rawIn -> { @@ -87,7 +91,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { } try { - blob.upload(in, length); + blob.upload(in, length, null, null, operationContext); BlobProperties properties = blob.getProperties(); attributes.put("azure.container", containerName); attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java index d6e510e..c3a2877 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.storage.queue; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.queue.CloudQueue; import com.microsoft.azure.storage.queue.CloudQueueClient; @@ -94,7 +95,7 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage { private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList( AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE, - BATCH_SIZE, VISIBILITY_TIMEOUT)); + BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -122,7 +123,11 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage { try { cloudQueueClient = createCloudQueueClient(context, null); cloudQueue = cloudQueueClient.getQueueReference(queue); - retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null); + + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + + retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, operationContext); } catch (URISyntaxException | StorageException e) { getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e}); context.yield(); @@ -184,6 +189,8 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage { .build()); } + AzureStorageUtils.validateProxySpec(validationContext, problems); + return problems; } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java index c289a74..4172c89 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.storage.queue; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.queue.CloudQueue; import com.microsoft.azure.storage.queue.CloudQueueClient; @@ -70,7 +71,7 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage { private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList( AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL, - QUEUE, VISIBILITY_DELAY)); + QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -101,7 +102,11 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage { try { cloudQueueClient = createCloudQueueClient(context, flowFile); cloudQueue = cloudQueueClient.getQueueReference(queue); - cloudQueue.addMessage(message, ttl, delay, null, null); + + final OperationContext operationContext = new OperationContext(); + AzureStorageUtils.setProxy(operationContext, context); + + cloudQueue.addMessage(message, ttl, delay, null, operationContext); } catch (URISyntaxException | StorageException e) { getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e}); flowFile = session.penalize(flowFile); @@ -147,6 +152,8 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage { } } + AzureStorageUtils.validateProxySpec(validationContext, problems); + return problems; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index 0a9696e..2821258 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.azure.storage.utils; import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; import com.microsoft.azure.storage.blob.CloudBlobClient; @@ -29,6 +30,8 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; import java.net.URI; import java.net.URISyntaxException; @@ -162,4 +165,17 @@ public final class AzureStorageUtils { return results; } + + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS}; + public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE + = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS); + + public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results) { + ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS); + } + + public static void setProxy(final OperationContext operationContext, final ProcessContext processContext) { + final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext); + operationContext.setProxy(proxyConfig.createProxy()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index 5ab046d..d4157dd 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -57,6 +57,10 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-record</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-proxy-configuration-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index b279346..0c2a124 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -33,13 +33,14 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.InputStream; -import java.net.InetSocketAddress; import java.net.Proxy; import java.net.URL; import java.util.ArrayList; @@ -136,25 +137,26 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .build(); } - private static final List<PropertyDescriptor> propertyDescriptors; + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS}; + public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE + = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); + static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS; static { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ES_URL); + properties.add(PROP_SSL_CONTEXT_SERVICE); + properties.add(USERNAME); + properties.add(PASSWORD); + properties.add(CONNECT_TIMEOUT); + properties.add(RESPONSE_TIMEOUT); + properties.add(PROXY_CONFIGURATION_SERVICE); properties.add(PROXY_HOST); properties.add(PROXY_PORT); properties.add(PROXY_USERNAME); properties.add(PROXY_PASSWORD); - properties.add(RESPONSE_TIMEOUT); - propertyDescriptors = Collections.unmodifiableList(properties); - } - - @Override - public List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.addAll(propertyDescriptors); - return properties; + COMMON_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); } @Override @@ -164,28 +166,39 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder(); // Add a proxy if set - final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); - final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger(); - if (proxyHost != null && proxyPort != null) { - final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> { + final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); + final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger(); + if (proxyHost != null && proxyPort != null) { + final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); + componentProxyConfig.setProxyType(Proxy.Type.HTTP); + componentProxyConfig.setProxyServerHost(proxyHost); + componentProxyConfig.setProxyServerPort(proxyPort); + componentProxyConfig.setProxyUserName(context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue()); + componentProxyConfig.setProxyUserPassword(context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue()); + return componentProxyConfig; + } + return ProxyConfiguration.DIRECT_CONFIGURATION; + }); + + if (!Proxy.Type.DIRECT.equals(proxyConfig.getProxyType())) { + final Proxy proxy = proxyConfig.createProxy(); okHttpClient.proxy(proxy); - } - final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue(); - final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); - - if (proxyUsername != null && proxyPassword != null){ - okHttpClient.proxyAuthenticator(new Authenticator() { - @Override - public Request authenticate(Route route, Response response) throws IOException { - final String credential=Credentials.basic(proxyUsername, proxyPassword); - return response.request().newBuilder() - .header("Proxy-Authorization", credential) - .build(); - } - }); + if (proxyConfig.hasCredential()){ + okHttpClient.proxyAuthenticator(new Authenticator() { + @Override + public Request authenticate(Route route, Response response) throws IOException { + final String credential=Credentials.basic(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword()); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + } + }); + } } + // Set timeouts okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); @@ -208,8 +221,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic results.add(new ValidationResult.Builder() .valid(false) .explanation("Proxy Host and Proxy Port must be both set or empty") + .subject("Proxy server configuration") .build()); } + + ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS); + return results; } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index e069c72..e782ba3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -147,13 +147,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { _rels.add(REL_NOT_FOUND); relationships = Collections.unmodifiableSet(_rels); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(ES_URL); - descriptors.add(PROP_SSL_CONTEXT_SERVICE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(CONNECT_TIMEOUT); - descriptors.add(RESPONSE_TIMEOUT); + final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(DOC_ID); descriptors.add(INDEX); descriptors.add(TYPE); @@ -169,9 +163,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.addAll(propertyDescriptors); - return properties; + return propertyDescriptors; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index cd1092a..ed9a510 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -150,13 +150,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { _rels.add(REL_RETRY); relationships = Collections.unmodifiableSet(_rels); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(ES_URL); - descriptors.add(PROP_SSL_CONTEXT_SERVICE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(CONNECT_TIMEOUT); - descriptors.add(RESPONSE_TIMEOUT); + final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(ID_ATTRIBUTE); descriptors.add(INDEX); descriptors.add(TYPE); @@ -174,9 +168,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.addAll(propertyDescriptors); - return properties; + return propertyDescriptors; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index 9ea22bc..3e796fb 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -189,13 +189,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess _rels.add(REL_RETRY); relationships = Collections.unmodifiableSet(_rels); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(ES_URL); - descriptors.add(PROP_SSL_CONTEXT_SERVICE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(CONNECT_TIMEOUT); - descriptors.add(RESPONSE_TIMEOUT); + final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(RECORD_READER); descriptors.add(ID_RECORD_PATH); descriptors.add(INDEX); http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index b1f860f..33eac3b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -230,13 +230,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER; static { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(ES_URL); - descriptors.add(PROP_SSL_CONTEXT_SERVICE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(CONNECT_TIMEOUT); - descriptors.add(RESPONSE_TIMEOUT); + final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(QUERY); descriptors.add(PAGE_SIZE); descriptors.add(INDEX); @@ -257,9 +251,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.addAll(propertyDescriptors); - return properties; + return propertyDescriptors; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index f08e33c..e90af79 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -182,13 +182,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor _rels.add(REL_FAILURE); relationships = Collections.unmodifiableSet(_rels); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(ES_URL); - descriptors.add(PROP_SSL_CONTEXT_SERVICE); - descriptors.add(USERNAME); - descriptors.add(PASSWORD); - descriptors.add(CONNECT_TIMEOUT); - descriptors.add(RESPONSE_TIMEOUT); + final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(QUERY); descriptors.add(SCROLL_DURATION); descriptors.add(PAGE_SIZE); @@ -207,9 +201,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.addAll(propertyDescriptors); - return properties; + return propertyDescriptors; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index d3fa7f8..1e7ffb4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -66,6 +66,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-proxy-configuration-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-record</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index 4886274..4b4d207 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -27,6 +28,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; @@ -63,6 +66,7 @@ public class FetchFTP extends FetchFileTransfer { properties.add(FTPTransfer.USE_COMPRESSION); properties.add(FTPTransfer.CONNECTION_MODE); properties.add(FTPTransfer.TRANSFER_MODE); + properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE); properties.add(FTPTransfer.PROXY_TYPE); properties.add(FTPTransfer.PROXY_HOST); properties.add(FTPTransfer.PROXY_PORT); @@ -76,4 +80,11 @@ public class FetchFTP extends FetchFileTransfer { protected FileTransfer createFileTransfer(final ProcessContext context) { return new FTPTransfer(context, getLogger()); } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + FTPTransfer.validateProxySpec(validationContext, results); + return results; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2834fa4c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 6846557..19cba94 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -28,7 +29,10 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -76,6 +80,12 @@ public class FetchSFTP extends FetchFileTransfer { properties.add(SFTPTransfer.HOST_KEY_FILE); properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); properties.add(SFTPTransfer.USE_COMPRESSION); + properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE); + properties.add(FTPTransfer.PROXY_TYPE); + properties.add(FTPTransfer.PROXY_HOST); + properties.add(FTPTransfer.PROXY_PORT); + properties.add(FTPTransfer.HTTP_PROXY_USERNAME); + properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); return properties; } @@ -83,4 +93,11 @@ public class FetchSFTP extends FetchFileTransfer { protected FileTransfer createFileTransfer(final ProcessContext context) { return new SFTPTransfer(context, getLogger()); } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Collection<ValidationResult> results = new ArrayList<>(); + SFTPTransfer.validateProxySpec(validationContext, results); + return results; + } }
