mark-bathori commented on code in PR #8368:
URL: https://github.com/apache/nifi/pull/8368#discussion_r1482928857


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.S3Object;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion;
+import static org.apache.nifi.util.StringUtils.isBlank;
+
+@Tags({"Amazon", "S3", "AWS", "file", "resource"})
+@SeeAlso({FetchS3Object.class})
+@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource 
for other components.")
+@UseCase(
+        description = "Fetch a specific file from S3. " +
+                "The service provides higher performance compared to fetch 
processors when the data should be moved between different storages without any 
transformation.",
+        configuration = """
+                "Bucket" = "${s3.bucket}"
+                "Name" = "${filename}"
+
+                The "Region" property must be set to denote the S3 region that 
the Bucket resides in.
+                If the flow being built is to be reused elsewhere, it's a good 
idea to parameterize this property by setting it to something like #{S3_REGION}.
+
+                The "AWS Credentials Provider Service" property should specify 
an instance of the AWSCredentialsProviderService in order to provide 
credentials for accessing the bucket.
+                """
+)
+public class AwsFileResourceService extends AbstractControllerService 
implements FileResourceService {

Review Comment:
   I was wondering that maybe S3FileResourceService could be a better name 
since the Controller Service is fetching data from S3 and also other processors 
are using the 'S3' naming eg.: FethcS3, ListS3, PutS3 etc. What is your opinion?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.util;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.Map;
+
+import static org.apache.nifi.processors.aws.s3.AbstractS3Processor.S3_REGION;
+
+/**
+ * Utility class for AWS region methods. This class uses AWS SDK v1.
+ *
+ */
+public abstract class RegionUtilV1 {

Review Comment:
   As I can see every method and variables in this class exists already in 
`AbstractS3Processor`. I think it would be a better approach to extract these 
into a common place and use them both in `AbstractS3Processor` and 
`AwsFileResourceService`.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.S3Object;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion;
+import static org.apache.nifi.util.StringUtils.isBlank;
+
+@Tags({"Amazon", "S3", "AWS", "file", "resource"})
+@SeeAlso({FetchS3Object.class})
+@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource 
for other components.")
+@UseCase(
+        description = "Fetch a specific file from S3. " +
+                "The service provides higher performance compared to fetch 
processors when the data should be moved between different storages without any 
transformation.",
+        configuration = """
+                "Bucket" = "${s3.bucket}"
+                "Name" = "${filename}"
+
+                The "Region" property must be set to denote the S3 region that 
the Bucket resides in.
+                If the flow being built is to be reused elsewhere, it's a good 
idea to parameterize this property by setting it to something like #{S3_REGION}.
+
+                The "AWS Credentials Provider Service" property should specify 
an instance of the AWSCredentialsProviderService in order to provide 
credentials for accessing the bucket.
+                """
+)
+public class AwsFileResourceService extends AbstractControllerService 
implements FileResourceService {
+
+    public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new 
PropertyDescriptor.Builder()
+            
.fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE)
+            .build();
+
+    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractS3Processor.KEY)
+            .build();
+
+    public static final PropertyDescriptor S3_REGION = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractS3Processor.S3_REGION)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            S3_REGION,
+            AWS_CREDENTIALS_PROVIDER_SERVICE);
+
+    private final Cache<Region, AmazonS3> clientCache = 
Caffeine.newBuilder().build();
+
+    private volatile PropertyContext context;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.context = context;
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        this.context = null;
+    }
+
+    @Override
+    public FileResource getFileResource(Map<String, String> attributes) {
+        final AWSCredentialsProviderService awsCredentialsProviderService = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                .asControllerService(AWSCredentialsProviderService.class);
+        final AmazonS3 client = getS3Client(attributes, 
awsCredentialsProviderService.getCredentialsProvider());
+
+        try {
+            return fetchObject(client, attributes);
+        } catch (final ProcessException | SdkClientException e) {
+            throw new ProcessException("Failed to fetch s3 object", e);
+        }
+    }
+
+    /**
+     * Fetches s3 object from the provided bucket and returns it as 
FileResource
+     *
+     * @param client amazon s3 client
+     * @param attributes configuration attributes
+     * @return fetched s3 object as FileResource
+     * @throws ProcessException if the object 'bucketName/key' does not exist
+     */
+    private FileResource fetchObject(final AmazonS3 client, final Map<String, 
String> attributes) throws ProcessException,
+            SdkClientException {
+        final String bucketName = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+        if (isBlank(bucketName) || isBlank(key)) {
+            throw new ProcessException("Bucket name or key value is missing");
+        }
+
+        if (!client.doesObjectExist(bucketName, key)) {
+            throw new ProcessException(String.format("Object '%s/%s' does not 
exist in s3", bucketName, key));
+        }
+
+        final S3Object object = client.getObject(bucketName, key);
+        return new FileResource(object.getObjectContent(), 
object.getObjectMetadata().getContentLength());
+    }
+
+    protected AmazonS3 getS3Client(Map<String, String> attributes, 
AWSCredentialsProvider credentialsProvider) {
+        Region region = resolveRegion(context, attributes);

Review Comment:
   This can be `final`.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AwsFileResourceServiceTest.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.processor.exception.ProcessException;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AwsFileResourceServiceTest {
+
+    private static final String TEST_NAME = 
AwsFileResourceServiceTest.class.getSimpleName();

Review Comment:
   I think this should be renamed since it is used as a` Service Identifier` 
and not as as name of the test, probably it could be added directly since this 
variable is only used at one place.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AwsFileResourceService.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.S3Object;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion;
+import static org.apache.nifi.util.StringUtils.isBlank;
+
+@Tags({"Amazon", "S3", "AWS", "file", "resource"})
+@SeeAlso({FetchS3Object.class})
+@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource 
for other components.")
+@UseCase(
+        description = "Fetch a specific file from S3. " +
+                "The service provides higher performance compared to fetch 
processors when the data should be moved between different storages without any 
transformation.",
+        configuration = """
+                "Bucket" = "${s3.bucket}"
+                "Name" = "${filename}"
+
+                The "Region" property must be set to denote the S3 region that 
the Bucket resides in.
+                If the flow being built is to be reused elsewhere, it's a good 
idea to parameterize this property by setting it to something like #{S3_REGION}.
+
+                The "AWS Credentials Provider Service" property should specify 
an instance of the AWSCredentialsProviderService in order to provide 
credentials for accessing the bucket.
+                """
+)
+public class AwsFileResourceService extends AbstractControllerService 
implements FileResourceService {
+
+    public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new 
PropertyDescriptor.Builder()
+            
.fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE)
+            .build();
+
+    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractS3Processor.KEY)
+            .build();
+
+    public static final PropertyDescriptor S3_REGION = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractS3Processor.S3_REGION)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            S3_REGION,
+            AWS_CREDENTIALS_PROVIDER_SERVICE);
+
+    private final Cache<Region, AmazonS3> clientCache = 
Caffeine.newBuilder().build();

Review Comment:
   I think the cache should be emptied when the Controller Service is disabled, 
so it won't store any unnecessary reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to