mark-bathori commented on code in PR #8368: URL: https://github.com/apache/nifi/pull/8368#discussion_r1482928857
########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3Object; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"Amazon", "S3", "AWS", "file", "resource"}) +@SeeAlso({FetchS3Object.class}) +@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.") +@UseCase( + description = "Fetch a specific file from S3. " + + "The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Bucket" = "${s3.bucket}" + "Name" = "${filename}" + + The "Region" property must be set to denote the S3 region that the Bucket resides in. + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like #{S3_REGION}. + + The "AWS Credentials Provider Service" property should specify an instance of the AWSCredentialsProviderService in order to provide credentials for accessing the bucket. + """ +) +public class AwsFileResourceService extends AbstractControllerService implements FileResourceService { Review Comment: I was wondering that maybe S3FileResourceService could be a better name since the Controller Service is fetching data from S3 and also other processors are using the 'S3' naming eg.: FethcS3, ListS3, PutS3 etc. What is your opinion? ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.util; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Map; + +import static org.apache.nifi.processors.aws.s3.AbstractS3Processor.S3_REGION; + +/** + * Utility class for AWS region methods. This class uses AWS SDK v1. + * + */ +public abstract class RegionUtilV1 { Review Comment: As I can see every method and variables in this class exists already in `AbstractS3Processor`. I think it would be a better approach to extract these into a common place and use them both in `AbstractS3Processor` and `AwsFileResourceService`. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3Object; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"Amazon", "S3", "AWS", "file", "resource"}) +@SeeAlso({FetchS3Object.class}) +@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.") +@UseCase( + description = "Fetch a specific file from S3. " + + "The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Bucket" = "${s3.bucket}" + "Name" = "${filename}" + + The "Region" property must be set to denote the S3 region that the Bucket resides in. + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like #{S3_REGION}. + + The "AWS Credentials Provider Service" property should specify an instance of the AWSCredentialsProviderService in order to provide credentials for accessing the bucket. + """ +) +public class AwsFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE) + .build(); + + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.KEY) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.S3_REGION) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + BUCKET_WITH_DEFAULT_VALUE, + KEY, + S3_REGION, + AWS_CREDENTIALS_PROVIDER_SERVICE); + + private final Cache<Region, AmazonS3> clientCache = Caffeine.newBuilder().build(); + + private volatile PropertyContext context; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.context = null; + } + + @Override + public FileResource getFileResource(Map<String, String> attributes) { + final AWSCredentialsProviderService awsCredentialsProviderService = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE) + .asControllerService(AWSCredentialsProviderService.class); + final AmazonS3 client = getS3Client(attributes, awsCredentialsProviderService.getCredentialsProvider()); + + try { + return fetchObject(client, attributes); + } catch (final ProcessException | SdkClientException e) { + throw new ProcessException("Failed to fetch s3 object", e); + } + } + + /** + * Fetches s3 object from the provided bucket and returns it as FileResource + * + * @param client amazon s3 client + * @param attributes configuration attributes + * @return fetched s3 object as FileResource + * @throws ProcessException if the object 'bucketName/key' does not exist + */ + private FileResource fetchObject(final AmazonS3 client, final Map<String, String> attributes) throws ProcessException, + SdkClientException { + final String bucketName = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + + if (isBlank(bucketName) || isBlank(key)) { + throw new ProcessException("Bucket name or key value is missing"); + } + + if (!client.doesObjectExist(bucketName, key)) { + throw new ProcessException(String.format("Object '%s/%s' does not exist in s3", bucketName, key)); + } + + final S3Object object = client.getObject(bucketName, key); + return new FileResource(object.getObjectContent(), object.getObjectMetadata().getContentLength()); + } + + protected AmazonS3 getS3Client(Map<String, String> attributes, AWSCredentialsProvider credentialsProvider) { + Region region = resolveRegion(context, attributes); Review Comment: This can be `final`. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AwsFileResourceServiceTest.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AwsFileResourceServiceTest { + + private static final String TEST_NAME = AwsFileResourceServiceTest.class.getSimpleName(); Review Comment: I think this should be renamed since it is used as a` Service Identifier` and not as as name of the test, probably it could be added directly since this variable is only used at one place. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3Object; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"Amazon", "S3", "AWS", "file", "resource"}) +@SeeAlso({FetchS3Object.class}) +@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.") +@UseCase( + description = "Fetch a specific file from S3. " + + "The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Bucket" = "${s3.bucket}" + "Name" = "${filename}" + + The "Region" property must be set to denote the S3 region that the Bucket resides in. + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like #{S3_REGION}. + + The "AWS Credentials Provider Service" property should specify an instance of the AWSCredentialsProviderService in order to provide credentials for accessing the bucket. + """ +) +public class AwsFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE) + .build(); + + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.KEY) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.S3_REGION) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + BUCKET_WITH_DEFAULT_VALUE, + KEY, + S3_REGION, + AWS_CREDENTIALS_PROVIDER_SERVICE); + + private final Cache<Region, AmazonS3> clientCache = Caffeine.newBuilder().build(); Review Comment: I think the cache should be emptied when the Controller Service is disabled, so it won't store any unnecessary reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
