KalmanJantner commented on code in PR #6589:
URL: https://github.com/apache/nifi/pull/6589#discussion_r1014431551


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);

Review Comment:
   Thanks I've updated.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);

Review Comment:
   Thanks, now we will keep the original flow file.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")
+public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> {

Review Comment:
   Based on your suggestion I have renamed them.
   Now we have `AwsMLJobStatusGetter`, `AwsMLJobStatusGetter` and each service 
have  a `StartAws<serviceName>Job` and a `GetAws<serviceName>JobStatus`



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));

Review Comment:
   Thank you, I have removed them.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")
+@SeeAlso({PollyFetcher.class})

Review Comment:
   Sure, fixed.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during writing aws 
response to flow file.", e);
+            return;
+        }
+
+        try {
+            postProcessFlowFile(context, session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during AWS ML post 
processing.", e);
+            return;
+        }

Review Comment:
   Thanks, I have refactored this part.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {

Review Comment:
   Fixed, thx



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The job failed, the original FlowFile will be routed 
to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    protected final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+
+    public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SUCCESS,
+            REL_IN_PROGRESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected SERVICE createClient(ProcessContext context, AWSCredentials 
credentials, ClientConfiguration config) {
+        throw new UnsupportedOperationException("Tried to create client in a 
deprecated way.");
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(ResponseMetadata.class, new 
AwsResponseMetadataDeserializer());
+        SimpleModule module2 = new SimpleModule();

Review Comment:
   Thanks, I fixed that.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The job failed, the original FlowFile will be routed 
to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    protected final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+
+    public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SUCCESS,
+            REL_IN_PROGRESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected SERVICE createClient(ProcessContext context, AWSCredentials 
credentials, ClientConfiguration config) {
+        throw new UnsupportedOperationException("Tried to create client in a 
deprecated way.");
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(ResponseMetadata.class, new 
AwsResponseMetadataDeserializer());
+        SimpleModule module2 = new SimpleModule();
+        module.addDeserializer(SdkHttpMetadata.class, new 
SdkHttpMetadataDeserializer());
+        mapper.registerModule(module);
+        mapper.registerModule(module2);
+    }
+
+
+    protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, 
Object response) {
+        session.write(flowFile, out -> {
+            try (BufferedWriter bufferedWriter = new BufferedWriter(new 
OutputStreamWriter(out, StandardCharsets.UTF_8))) {
+                bufferedWriter.write(mapper.writeValueAsString(response));
+                bufferedWriter.newLine();

Review Comment:
   Not needed, I removed.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {
+    public static final String DOCUMENT_ANALYSIS = "Document Analysis";
+    public static final String DOCUMENT_TEXT_DETECTION = "Document Text 
Detection";
+    public static final String EXPENSE_ANALYSIS = "Expense Analysis";
+    private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING =
+            ImmutableMap.of(DOCUMENT_ANALYSIS, new 
TextractDocumentAnalysisResultUtil(),
+                    DOCUMENT_TEXT_DETECTION, new 
TextractDocumentTextDetectionUtil(),
+                    EXPENSE_ANALYSIS, new 
TextractDocumentExpenseAnalysisUtil());
+
+    @Override
+    protected AmazonTextractClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonTextractClient) AmazonTextractClient.builder().build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        String typeOfTextract = flowFile.getAttribute(TYPE.getName());
+        AwsTextractTaskAware util = UTIL_MAPPING.get(typeOfTextract);

Review Comment:
   I have removed the mapping, thx



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")
+@SeeAlso({PollyFetcher.class})
+public class PollyProcessor extends AwsMlProcessor<AmazonPollyClient, 
StartSpeechSynthesisTaskRequest, StartSpeechSynthesisTaskResult> {

Review Comment:
   Fixed, thank you.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();

Review Comment:
   Relation has been renamed



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);

Review Comment:
   Thanks, it has been updated based on your suggestions.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")

Review Comment:
   Thanks, I've updated them.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")

Review Comment:
   I've updated the descriptions. thx



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during writing aws 
response to flow file.", e);
+            return;
+        }
+
+        try {
+            postProcessFlowFile(context, session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during AWS ML post 
processing.", e);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    protected void postProcessFlowFile(ProcessContext context, ProcessSession 
session, FlowFile flowFile, RESPONSE response) {
+        session.putAttribute(flowFile, AWS_TASK_ID_PROPERTY, 
getAwsTaskId(context, response));
+        getLogger().debug("AWS ML task has been started with task id: {}", 
getAwsTaskId(context, response));
+    }
+
+    protected REQUEST buildRequest(ProcessSession session, ProcessContext 
context, FlowFile flowFile) throws JsonProcessingException {
+        REQUEST request;
+        try {
+            request = mapper.readValue(getPayload(session, context, flowFile),
+                    getAwsRequestClass(context));
+        } catch (JsonProcessingException e) {
+            getLogger().error("Exception was thrown during AWS ML request 
creation.", e);
+            throw e;

Review Comment:
   Thank you, I've updated it.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")
+public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> {
+
+    @Override
+    protected AmazonPollyClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonPollyClient) 
AmazonPollyClientBuilder.standard().withCredentials(credentialsProvider).build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        GetSpeechSynthesisTaskResult speechSynthesisTask = 
getSynthesisTask(flowFile);

Review Comment:
   According to the aws doc `ThrottlingException` will be thrown when we exceed 
the rate limit. So I added a separated catch for that and kept the existing 
general purpose catch.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {
+    public static final String DOCUMENT_ANALYSIS = "Document Analysis";
+    public static final String DOCUMENT_TEXT_DETECTION = "Document Text 
Detection";
+    public static final String EXPENSE_ANALYSIS = "Expense Analysis";
+    private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING =
+            ImmutableMap.of(DOCUMENT_ANALYSIS, new 
TextractDocumentAnalysisResultUtil(),
+                    DOCUMENT_TEXT_DETECTION, new 
TextractDocumentTextDetectionUtil(),
+                    EXPENSE_ANALYSIS, new 
TextractDocumentExpenseAnalysisUtil());
+
+    @Override
+    protected AmazonTextractClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonTextractClient) AmazonTextractClient.builder().build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        String typeOfTextract = flowFile.getAttribute(TYPE.getName());

Review Comment:
   Thanks, fixed



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")

Review Comment:
   Yep, fixed thank you.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsResponseMetadataDeserializer.java:
##########
@@ -0,0 +1,19 @@
+package org.apache.nifi.processors.aws.ml;

Review Comment:
   I have fixed these, thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to