nandorsoma commented on code in PR #5656:
URL: https://github.com/apache/nifi/pull/5656#discussion_r870019400


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/polly/StartSpeechSynthesisTask.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyAsync;
+import com.amazonaws.services.polly.AmazonPollyAsyncClient;
+import com.amazonaws.services.polly.AmazonPollyAsyncClientBuilder;
+import com.amazonaws.services.polly.model.EngineNotSupportedException;
+import com.amazonaws.services.polly.model.InvalidS3BucketException;
+import com.amazonaws.services.polly.model.InvalidS3KeyException;
+import com.amazonaws.services.polly.model.InvalidSampleRateException;
+import com.amazonaws.services.polly.model.InvalidSnsTopicArnException;
+import com.amazonaws.services.polly.model.InvalidSsmlException;
+import com.amazonaws.services.polly.model.LanguageNotSupportedException;
+import com.amazonaws.services.polly.model.LexiconNotFoundException;
+import com.amazonaws.services.polly.model.MarksNotSupportedForFormatException;
+import 
com.amazonaws.services.polly.model.SsmlMarksNotSupportedForTextTypeException;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TextLengthExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+@CapabilityDescription("Sends the contents of an incoming FlowFile to Amazon 
Polly service in order to synthesize speech from the FlowFile's textual 
content. This processor does not wait for the " +
+    "results but rather starts the task. Once the task has started, the 
outgoing FlowFile will have attributes added pointing to the Amazon Polly Task. 
Those results can then be fetched using the " +
+    "FetchSpeechSynthesisResults. See this Processor's Additional Details for 
more information.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "aws.polly.task.id", description = "The ID of 
the Amazon Polly task that was created. This can be used by 
FetchSpeechSynthesisResults to get the results of this " +
+        "task."),
+    @WritesAttribute(attribute = "aws.polly.task.creation.time", description = 
 "The timestamp (in milliseconds since epoch) that the Amazon Polly task was 
created.")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "polly", "ml", "ai", "machine learning", 
"artificial intelligence", "speech", "text-to-speech", "unstructured"})
+@SeeAlso({FetchSpeechSynthesisResults.class})
+public class StartSpeechSynthesisTask extends 
AbstractAWSCredentialsProviderProcessor<AmazonPollyAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    private static final AllowableValue TEXT_TYPE_SSML = new 
AllowableValue("ssml", "Speech Synthesis Markup Language (SSML)", "Input is in 
SSML format");
+    private static final AllowableValue TEXT_TYPE_PLAIN_TEXT = new 
AllowableValue("text", "Plain Text", "Input is in plain text format");
+    private static final AllowableValue TEXT_TYPE_USE_ATTRIBUTE = new 
AllowableValue("attribute", "Use 'mime.type' Attribute", "The mime.type 
attribute will be used to determine whether the input " +
+        "is in SSML format or plain text. If the attribute is not present or 
one of the known MIME types for SSML, then plain text will be used.");
+    private static final Set<String> SSML_MIME_TYPES = new 
HashSet<>(Arrays.asList("application/voicexml+xml", "application/ssml+xml", 
"application/srgs",
+        "application/srgs+xml", "application/ccxml+xml", 
"application/pls+xml"));
+
+    private static final String VOICE_ID_ATTRIBUTE = "aws.voice.id";
+    private static final String USE_VOICE_ID_ATTRIBUTE = "Use '" + 
VOICE_ID_ATTRIBUTE + "' Attribute";
+
+    static final PropertyDescriptor TEXT_TYPE = new 
PropertyDescriptor.Builder()
+        .name("Text Type")
+        .displayName("Text Type")
+        .description("The format of the FlowFile content")
+        .required(true)
+        .allowableValues(TEXT_TYPE_SSML, TEXT_TYPE_PLAIN_TEXT, 
TEXT_TYPE_USE_ATTRIBUTE)
+        .defaultValue(TEXT_TYPE_USE_ATTRIBUTE.getValue())
+        .build();
+    static final PropertyDescriptor VOICE_ID = new PropertyDescriptor.Builder()
+        .name("Voice ID")
+        .displayName("Voice ID")
+        .description("The identifier for which of the voices should be used to 
synthesize speech")
+        .required(true)
+        .allowableValues("Aditi", "Amy", "Astrid", "Bianca", "Brian", 
"Camila", "Carla", "Carmen", "Celine", "Chantal", "Conchita", "Cristiano",
+            "Dora", "Emma", "Enrique", "Ewa", "Filiz", "Gabrielle", "Geraint", 
"Giorgio", "Gwyneth", "Hans", "Ines", "Ivy",
+            "Jacek", "Jan", "Joanna", "Joey", "Justin", "Karl", "Kendra", 
"Kevin", "Kimberly", "Lea", "Liv", "Lotte", "Lucia", "Lupe",
+            "Mads", "Maja", "Marlene", "Mathieu", "Matthew", "Maxim", "Mia", 
"Miguel", "Mizuki", "Naja", "Nicole", "Olivia",
+            "Penelope", "Raveena", "Ricardo", "Ruben", "Russell", "Salli", 
"Seoyeon", "Takumi", "Tatyana", "Vicki", "Vitoria",
+            "Zeina", "Zhiyu", "Aria", "Ayanda", USE_VOICE_ID_ATTRIBUTE)
+        .defaultValue("Amy")
+        .build();
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .displayName("Character Set")
+        .description("The character set of the FlowFile content")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue("UTF-8")
+        .build();
+    static final PropertyDescriptor ENGINE = new PropertyDescriptor.Builder()
+        .name("Engine")
+        .displayName("Engine")
+        .description("Which Engine to use for performing the speech synthesis")
+        .required(true)
+        .allowableValues("standard", "neural")
+        .defaultValue("standard")
+        .build();
+    static final PropertyDescriptor OUTPUT_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("Output Format")
+        .displayName("Output Format")
+        .description("Specifies the format that Amazon Polly should generate")
+        .required(true)
+        .allowableValues("json", "mp3", "ogg_vorbis", "pcm")
+        .defaultValue("mp3")
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_BUCKET_NAME = new 
PropertyDescriptor.Builder()
+        .name("Output S3 Bucket Name")
+        .displayName("Output S3 Bucket Name")
+        .description("The name of the S3 bucket that Polly should write the 
results to")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_KEY_PREFIX = new 
PropertyDescriptor.Builder()
+        .name("Output S3 Key Prefix")
+        .displayName("Output S3 Key Prefix")
+        .description("The prefix that Polly should use for naming the S3 
Object that it writes the results to")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SAMPLE_RATE = new 
PropertyDescriptor.Builder()
+        .name("Sample Rate")
+        .displayName("Sample Rate")
+        .description("The Sample Rate that should be used for synthesizing 
speech")
+        .required(true)
+        .allowableValues("8000", "16000", "22050", "24000")
+        .defaultValue("22050")
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new 
PropertyDescriptor.Builder()
+        .name("SNS Topic ARN")
+        .displayName("SNS Topic ARN")
+        .description("If specified, Polly will place a notification on the SNS 
topic whose ARN is specified when the speech synthesis is complete.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SPEECH_MARK_TYPES = new 
PropertyDescriptor.Builder()
+        .name("Speech Mark Types")
+        .displayName("Speech Mark Types")
+        .description("A comma-separted list of Speech Mark Types that should 
be returned for the input text. Valid values are 'sentence', 'ssml', 'viseme', 
and 'word'")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .dependsOn(OUTPUT_FORMAT, "json")
+        .build();
+    static final PropertyDescriptor LANGUAGE_CODE = new 
PropertyDescriptor.Builder()
+        .name("Language Code")
+        .displayName("Language Code")
+        .description("The Language Code to use for speech synthesis if the 
selected voice is bilingual")
+        .required(false)
+        .allowableValues("arb", "cmn-CN", "cy-GB", "da-DK", "de-DE", "en-AU", 
"en-GB", "en-GB-WLS", "en-IN", "en-US", "es-ES", "es-MX", "es-US",
+            "fr-CA", "fr-FR", "is-IS", "it-IT", "ja-JP", "hi-IN", "ko-KR", 
"nb-NO", "nl-NL", "pl-PL", "pt-BR", "pt-PT", "ro-RO", "ru-RU", "sv-SE",
+            "tr-TR", "en-NZ", "en-ZA")
+        .build();
+    static final PropertyDescriptor LEXICON_NAMES = new 
PropertyDescriptor.Builder()
+        .name("Lexicon Names")
+        .displayName("Lexicon Names")
+        .description("A comma-separated list of Lexicon Names that should be 
used for speech synthesis")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    private static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays.asList(
+        TEXT_TYPE,
+        VOICE_ID,
+        CHARACTER_SET,
+        ENGINE,
+        OUTPUT_FORMAT,
+        OUTPUT_S3_BUCKET_NAME,
+        OUTPUT_S3_KEY_PREFIX,
+        SAMPLE_RATE,
+        SNS_TOPIC_ARN,
+        SPEECH_MARK_TYPES,
+        AWS_CREDENTIALS_PROVIDER_SERVICE,

Review Comment:
   Probably this property should be required, otherwise nullPointerException is 
thrown when we start the processor:
   ```
   java.lang.NullPointerException: null
           at 
org.apache.nifi.processors.aws.polly.StartSpeechSynthesisTask.onTrigger(StartSpeechSynthesisTask.java:284)
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import 
com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract 
information from an image or PDF file that is stored in Amazon S3. This 
processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the 
Textract Job and the top of job that it was. See Additional Details for more 
information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the 
desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of 
the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of 
action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", 
"analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends 
AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new 
PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")

Review Comment:
   There is no default value for OUTPUT_S3_BUCKET. What is the reason that 
there is a default value here while in the other property hasn't got? Because 
of consistency reasons I wouldn't set a default value here or I would set one 
for the mentioned property.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/FetchTextractResults.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.Warning;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+@CapabilityDescription("Retrieves the results of a Textract job that was 
triggered via StartTextractJob. If the job has not completed yet, the incoming 
FlowFile will be routed to the 'in progress' " +
+    "Relationship, providing the ability to retry after the Penalization 
period. Otherwise, the original FlowFile will be transferred to the 'original' 
relationship while the results of the " +
+    "Textract job are transferred as JSON to the 'success' or 'partial 
success' relationship. This processor is typically used in conjunction with 
GetSQS and/or StartTextractJob. See Additional " +
+    "Details for more information.")
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", 
"analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, StartTextractJob.class, 
InvokeTextract.class})
+public class FetchTextractResults extends 
AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {

Review Comment:
   This processor fetches the actual result of the textract job and puts it to 
the flowfile. On the other hand FetchSpeechSynthesisResults just fetches the 
status and the actual result is fetched only with the FetchS3Object processor. 
Wouldn't it make sense to make the behavior of this processor similar to be 
consistent across these features?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import 
com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract 
information from an image or PDF file that is stored in Amazon S3. This 
processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the 
Textract Job and the top of job that it was. See Additional Details for more 
information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the 
desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of 
the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of 
action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", 
"analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends 
AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new 
PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_S3_NAME = new 
PropertyDescriptor.Builder()
+        .name("Document Object Name")
+        .displayName("Document Object Name")
+        .description("The name of the document in the S3 bucket to perform 
analysis against")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${filename}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_VERSION = new 
PropertyDescriptor.Builder()
+        .name("Document Version")
+        .displayName("Document Version")
+        .description("The version of the document in the S3 bucket to perform 
analysis against")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor CLIENT_REQUEST_TOKEN = new 
PropertyDescriptor.Builder()
+        .name("Client Request Token")
+        .displayName("Client Request Token")
+        .description("A value that will be sent to AWS in order to uniquely 
identify the request. This prevents accidental job duplication, which could 
lead to unexpected expenses. For example, if " +
+            "NiFi is restarted and the same FlowFile is sent a second time, 
this will result in AWS Textract being smart enough not to process the data 
again. However, if the Processor settings " +
+            "change, this could result in a processing failure. In such a 
case, this value may need to be changed.")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${uuid}")
+        .build();
+    static final PropertyDescriptor JOB_TAG = new PropertyDescriptor.Builder()
+        .name("Job Tag")
+        .displayName("Job Tag")
+        .description("The Job Tag to include in the request to AWS. This can 
be used to identify jobs in the completion statuses that are published to 
Amazon SNS.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor KMS_KEY_ID = new 
PropertyDescriptor.Builder()
+        .name("KMS Key ID")
+        .displayName("KMS Key ID")
+        .description("If specified, the key whose ID is provided will be used 
to encrypt the results before writing the results to S3")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new 
PropertyDescriptor.Builder()
+        .name("SNS Topic ARN")
+        .displayName("SNS Topic ARN")
+        .description("When Textract completes processing of the document, it 
will place a notification onto the topic with the specified ARN")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor ROLE_ARN = new PropertyDescriptor.Builder()
+        .name("Role ARN")
+        .displayName("Role ARN")
+        .description("The ARN of the Amazon Role that Textract is to use in 
order to publish results to the configured SNS Topic")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor OUTPUT_S3_BUCKET = new 
PropertyDescriptor.Builder()
+        .name("Output S3 Bucket")
+        .displayName("Output S3 Bucket")
+        .description("The name of the S3 Bucket that the Textract output 
should be written to")
+        .required(true)

Review Comment:
   I'm wondering that couldn't we make this property optional? When I tested 
I've seen that when Textract finished its job it saved the results to s3, but 
FetchTextractResults also wrote the result into the FlowFile's content. What 
happens if I don't want this duplication?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/StartTextractJob.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.DocumentLocation;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import 
com.amazonaws.services.textract.model.IdempotentParameterMismatchException;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidKMSKeyException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.NotificationChannel;
+import com.amazonaws.services.textract.model.OutputConfig;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.S3Object;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.StartDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.StartDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.StartExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+
+@CapabilityDescription("Starts a Textract Job to analyze and extract 
information from an image or PDF file that is stored in Amazon S3. This 
processor initiates the processing but does not wait for" +
+    " the result. Instead, it adds attributes indicating the identifier of the 
Textract Job and the top of job that it was. See Additional Details for more 
information on the expected usage of this" +
+    " Processor and how it can interact with other processors to obtain the 
desired result.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "textract.job.id", description = "The ID of 
the textract job"),
+    @WritesAttribute(attribute = "textract.action", description = "The type of 
action being performed")
+})
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", 
"analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, FetchTextractResults.class})
+public class StartTextractJob extends 
AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    static final PropertyDescriptor DOCUMENT_S3_BUCKET = new 
PropertyDescriptor.Builder()
+        .name("Document S3 Bucket")
+        .displayName("Document S3 Bucket")
+        .description("The name of the S3 Bucket that contains the document")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${s3.bucket}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_S3_NAME = new 
PropertyDescriptor.Builder()
+        .name("Document Object Name")
+        .displayName("Document Object Name")
+        .description("The name of the document in the S3 bucket to perform 
analysis against")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${filename}")
+        .build();
+    static final PropertyDescriptor DOCUMENT_VERSION = new 
PropertyDescriptor.Builder()
+        .name("Document Version")
+        .displayName("Document Version")
+        .description("The version of the document in the S3 bucket to perform 
analysis against")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor CLIENT_REQUEST_TOKEN = new 
PropertyDescriptor.Builder()
+        .name("Client Request Token")
+        .displayName("Client Request Token")
+        .description("A value that will be sent to AWS in order to uniquely 
identify the request. This prevents accidental job duplication, which could 
lead to unexpected expenses. For example, if " +
+            "NiFi is restarted and the same FlowFile is sent a second time, 
this will result in AWS Textract being smart enough not to process the data 
again. However, if the Processor settings " +
+            "change, this could result in a processing failure. In such a 
case, this value may need to be changed.")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("${uuid}")
+        .build();
+    static final PropertyDescriptor JOB_TAG = new PropertyDescriptor.Builder()
+        .name("Job Tag")
+        .displayName("Job Tag")
+        .description("The Job Tag to include in the request to AWS. This can 
be used to identify jobs in the completion statuses that are published to 
Amazon SNS.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor KMS_KEY_ID = new 
PropertyDescriptor.Builder()
+        .name("KMS Key ID")
+        .displayName("KMS Key ID")
+        .description("If specified, the key whose ID is provided will be used 
to encrypt the results before writing the results to S3")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor SNS_TOPIC_ARN = new 
PropertyDescriptor.Builder()

Review Comment:
   It's not really clear to me why SNS is needed. If I understand correctly 
FetchTextractResults uses the textract api to fetch the results and it doesn't 
use the SNS topic to get notified.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/FetchTextractResults.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest;
+import com.amazonaws.services.textract.model.GetDocumentAnalysisResult;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest;
+import com.amazonaws.services.textract.model.GetDocumentTextDetectionResult;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest;
+import com.amazonaws.services.textract.model.GetExpenseAnalysisResult;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.Warning;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+@CapabilityDescription("Retrieves the results of a Textract job that was 
triggered via StartTextractJob. If the job has not completed yet, the incoming 
FlowFile will be routed to the 'in progress' " +
+    "Relationship, providing the ability to retry after the Penalization 
period. Otherwise, the original FlowFile will be transferred to the 'original' 
relationship while the results of the " +
+    "Textract job are transferred as JSON to the 'success' or 'partial 
success' relationship. This processor is typically used in conjunction with 
GetSQS and/or StartTextractJob. See Additional " +
+    "Details for more information.")
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"aws", "cloud", "text", "textract", "unstructured", "detect", 
"analyze"})
+@SeeAlso({AWSCredentialsProviderService.class, StartTextractJob.class, 
InvokeTextract.class})
+public class FetchTextractResults extends 
AbstractAWSCredentialsProviderProcessor<AmazonTextractAsyncClient> {
+    private static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+    private static String JSON_MIME_TYPE = "application/json";
+
+    static final PropertyDescriptor ACTION = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(TextractProperties.ACTION)
+        .defaultValue(TextractProperties.USE_ATTRIBUTE.getValue())
+        .build();
+
+    static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
+        .name("Textract Job ID")
+        .displayName("Textract Job ID")
+        .description("The ID of the Textract Job whose results should be 
fetched")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue("${textract.job.id}")
+        .build();
+
+    static final PropertyDescriptor TEXT_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+        .name("Text Attribute Name")
+        .displayName("Text Attribute Name")
+        .description("Specifies the name of an attribute to add to the 
outbound FlowFiles that include the text that was detected. If not specified, 
the text will not be added as an attribute.")
+        .required(false)
+        .dependsOn(TextractProperties.ACTION, TextractProperties.DETECT_TEXT, 
TextractProperties.ANALYZE_DOCUMENT, TextractProperties.USE_ATTRIBUTE)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor TEXT_CONFIDENCE_THRESHOLD = new 
PropertyDescriptor.Builder()
+        .name("Text Confidence Threshold")
+        .displayName("Text Confidence Threshold")
+        .description("Specifies the minimum confidence that Textract must have 
in order for a line of text to be included in the attribute specified by the 
<Text Attribute Name> property")
+        .required(false)
+        .addValidator(StandardValidators.createLongValidator(1, 100, true))
+        .dependsOn(TEXT_ATTRIBUTE_NAME)
+        .defaultValue("70")
+        .build();
+    static final PropertyDescriptor MAX_CHARACTER_COUNT = new 
PropertyDescriptor.Builder()
+        .name("Max Character Count")
+        .displayName("Max Character Count")
+        .description("If the number of characters in the text exceed this 
threshold, the text will be truncated so as not to create a very large 
attribute.")
+        .required(false)
+        .addValidator(StandardValidators.createLongValidator(1, 16384, true))
+        .defaultValue("4096")
+        .dependsOn(TEXT_ATTRIBUTE_NAME)
+        .build();
+
+    private static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays.asList(
+        ACTION,
+        JOB_ID,
+        AWS_CREDENTIALS_PROVIDER_SERVICE,
+        TEXT_ATTRIBUTE_NAME,
+        TEXT_CONFIDENCE_THRESHOLD,
+        MAX_CHARACTER_COUNT,
+        TextractProperties.LINE_ITEM_ATTRIBUTE_PREFIX,
+        TextractProperties.SUMMARY_ITEM_ATTRIBUTE_PREFIX,
+        REGION,
+        TIMEOUT,
+        ENDPOINT_OVERRIDE,
+        PROXY_HOST,
+        PROXY_HOST_PORT,
+        PROXY_USERNAME,
+        PROXY_PASSWORD));
+
+    // Relationships
+    private static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("Upon successful completion, the original FlowFile will 
be routed to this relationship while the results of the Textract job will be 
routed to the 'success' relationship")
+        .autoTerminateDefault(true)
+        .build();
+    private static final Relationship REL_PARTIAL_SUCCESS = new 
Relationship.Builder()
+        .name("partial success")
+        .description("The Textract job has completed but was only partially 
successful")
+        .build();
+    private static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+        .name("in progress")
+        .description("The Textract job is currently still being processed")
+        .build();
+    static final Relationship REL_TRANSIENT_FAILURE = new 
Relationship.Builder()
+        .name("transient failure")
+        .description("Retrieving the AWS Textract results failed for some 
reason, but the issue is likely to resolve on its own, such as Provisioned 
Throughput Exceeded or a Throttling failure. " +
+            "It is generally expected to retry this relationship.")
+        .build();
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        REL_ORIGINAL,
+        REL_SUCCESS,
+        REL_PARTIAL_SUCCESS,
+        REL_IN_PROGRESS,
+        REL_TRANSIENT_FAILURE,
+        REL_FAILURE
+    )));
+
+    private final AtomicReference<ObjectMapper> objectMapperReference = new 
AtomicReference<>();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected AmazonTextractAsyncClient createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final 
ClientConfiguration clientConfig) {
+        final AmazonTextractAsyncClientBuilder clientBuilder = 
AmazonTextractAsyncClientBuilder.standard();
+        clientBuilder.setRegion(context.getProperty(REGION).getValue());
+        clientBuilder.setCredentials(credentialsProvider);
+        clientBuilder.setClientConfiguration(clientConfig);
+        final AmazonTextractAsync client = clientBuilder.build();
+        return (AmazonTextractAsyncClient) client;
+    }
+
+    @Override
+    protected boolean isInitializeRegionAndEndpoint() {
+        return false;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String actionName = TextractProperties.getAction(context, 
flowFile);
+        final String jobId = 
context.getProperty(JOB_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final JobResultsFetcher jobResults = getJobResults(actionName);
+        if (jobResults == null) {
+            getLogger().error("Cannot analyze text from {} because the 
Textract Action provided is invalid: {}", flowFile, actionName);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        try {
+            jobResults.processFlowFile(context, session, flowFile, jobId);
+
+            final String transitUri = "https://textract."; + 
context.getProperty(REGION).getValue() + ".amazonaws.com/jobs/" + jobId;
+            
session.getProvenanceReporter().invokeRemoteProcess(jobResults.getCreatedFlowFile(),
 transitUri);
+        } catch (final InternalServerErrorException | ThrottlingException | 
ProvisionedThroughputExceededException e) {
+            getLogger().error("Failed to retrieve results for {}. Routing to 
{}", flowFile, REL_TRANSIENT_FAILURE.getName(), e);
+            flowFile = session.putAttribute(flowFile, 
FAILURE_REASON_ATTRIBUTE, e.getMessage());
+            session.transfer(flowFile, REL_TRANSIENT_FAILURE);
+
+            final FlowFile created = jobResults.getCreatedFlowFile();
+            if (created != null) {
+                session.remove(created);
+            }
+
+            context.yield();
+        } catch (final Throwable t) {
+            getLogger().error("Failed to retrieve results for {}. Routing to 
{}", flowFile, REL_FAILURE.getName(), t);
+            flowFile = session.putAttribute(flowFile, 
FAILURE_REASON_ATTRIBUTE, t.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+
+            final FlowFile created = jobResults.getCreatedFlowFile();
+            if (created != null) {
+                session.remove(created);
+            }
+        }
+    }
+
+    @Override
+    protected AmazonTextractAsyncClient createClient(final ProcessContext 
context, final AWSCredentials credentials, final ClientConfiguration config) {
+        return null;
+    }
+
+    private JobResultsFetcher getJobResults(final String actionName) {
+        if 
(actionName.equalsIgnoreCase(TextractProperties.DETECT_TEXT.getValue())) {
+            return new DetectDocumentResultsFetcher();
+        }
+        if 
(actionName.equalsIgnoreCase(TextractProperties.ANALYZE_DOCUMENT.getValue())) {
+            return new AnalyzeDocumentResultsFetcher();
+        }
+        if 
(actionName.equalsIgnoreCase(TextractProperties.ANALYZE_EXPENSE.getValue())) {
+            return new AnalyzeExpenseResultsFetcher();
+        }
+
+        return null;
+    }
+
+    private interface JobResultsFetcher {
+        /**
+         * Process the given FlowFile, creating a new child FlowFile that 
contains the results of the Textract job.
+         * If the textract job is still being processed, the given FlowFile 
should be transferred to {@link #REL_IN_PROGRESS}.
+         * If the textract job has failed, the given FlowFile should be 
transferred to {@link #REL_FAILURE}.
+         * If the textract job has partially succeeded, the given FlowFile 
should be transferred to {@link #REL_ORIGINAL} and the result transferred to 
{@link #REL_PARTIAL_SUCCESS}.
+         * If the textract job has succeeded, the given FlowFile should be 
transferred to {@link #REL_ORIGINAL} and the result transferred to {@link 
#REL_SUCCESS}.
+         * If any Throwable is thrown, the given FlowFile should not be 
transferred anywhere, and any FlowFile that was created must be made
+         * available via the {@link #getCreatedFlowFile()} method.
+         */
+        void processFlowFile(ProcessContext context, ProcessSession session, 
FlowFile flowFile, String jobId) throws Throwable;
+
+        FlowFile getCreatedFlowFile();
+    }
+
+    private ObjectMapper getObjectMapper() {
+        // Lazily initialize the ObjectMapper.
+        ObjectMapper mapper = objectMapperReference.get();
+        if (mapper != null) {
+            return mapper;
+        }
+
+        mapper = new ObjectMapper();
+        objectMapperReference.compareAndSet(null, mapper);
+        return mapper;
+    }
+
+    private void writeAsJson(final Collection<?> values, final OutputStream 
out) throws IOException {
+        final ObjectMapper mapper = getObjectMapper();
+        final JsonFactory jsonFactory = new JsonFactory(mapper);
+
+        try (final OutputStream blockingOut = new 
NonCloseableOutputStream(out);
+            final JsonGenerator jsonGenerator = 
jsonFactory.createGenerator(blockingOut)) {
+            jsonGenerator.writeStartArray();
+
+            for (final Object value : values) {
+                jsonGenerator.writeObject(value);
+            }
+
+            jsonGenerator.writeEndArray();
+            jsonGenerator.flush();
+        }
+    }
+
+    /**
+     * Processing of the Text Detection and Text Analysis are basically 
identical, but the AWS API provides a different class for each,
+     * with methods that have the same signature. Because of this, we cannot 
process them both using the same methods. As a result, we have
+     * an abstract implementation that is responsible for handling all 
processing with getter methods that transform the Response object into the
+     * objects that we want to retrieve from the response.
+     */
+    private abstract class DetectionAnalysisResultsFetcher<T> implements 
JobResultsFetcher {
+        private FlowFile createdFlowFile = null;
+
+        @Override
+        public FlowFile getCreatedFlowFile() {
+            return createdFlowFile;
+        }
+
+        public abstract Future<T> makeRequest(ProcessContext context, FlowFile 
flowFile, String jobId, String nextToken);
+
+        public abstract String getJobStatus(T response);
+        public abstract String getStatusMessage(T response);
+        public abstract List<Block> getBlocks(T response);
+        public abstract List<Warning> getWarnings(T response);
+        public abstract DocumentMetadata getDocumentMetadata(T response);
+        public abstract String getModelVersion(T response);
+        public abstract String getNextToken(T response);
+
+        @Override
+        public void processFlowFile(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile, final String jobId) throws 
Throwable {
+            T result;
+            try {
+                result = makeRequest(context, flowFile, jobId, null).get();
+            } catch (ExecutionException e) {
+                throw e.getCause();
+            }
+
+            final JobStatus jobStatus = 
JobStatus.fromValue(getJobStatus(result));
+
+            if (jobStatus == JobStatus.IN_PROGRESS) {
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_IN_PROGRESS);
+                return;
+            }
+
+            if (jobStatus == JobStatus.FAILED) {
+                final String failureReason = getStatusMessage(result);
+                session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, 
failureReason);
+                session.transfer(flowFile, REL_FAILURE);
+                getLogger().error("Textract reported that the task failed for 
{}: {}", flowFile, failureReason);
+                return;
+            }
+
+            final Relationship relationship;
+            if (jobStatus == JobStatus.PARTIAL_SUCCESS) {
+                relationship = REL_PARTIAL_SUCCESS;
+            } else {
+                relationship = REL_SUCCESS;
+            }
+
+            createdFlowFile = session.create(flowFile);
+
+            int blockCount = getBlocks(result).size();
+            final List<Warning> warnings = new 
ArrayList<>(getWarnings(result));
+            final Map<String, String> attributes = new HashMap<>();
+
+            final DocumentMetadata metadata = getDocumentMetadata(result);
+            final Integer pages = metadata.getPages();
+            if (pages != null) {
+                attributes.put("num.pages", pages.toString());
+            }
+
+            attributes.put("aws.model.version", getModelVersion(result));
+
+            final String textAttributeName = 
context.getProperty(TEXT_ATTRIBUTE_NAME).getValue();
+            final Integer confidenceThreshold = 
context.getProperty(TEXT_CONFIDENCE_THRESHOLD).asInteger();
+            final Integer maxChars = 
context.getProperty(MAX_CHARACTER_COUNT).asInteger();
+
+            try (final OutputStream out = session.write(createdFlowFile)) {
+                List<Block> blocks = getBlocks(result);
+                writeAsJson(blocks, out);
+
+                final StringBuilder textBuilder = new StringBuilder();
+                if (textAttributeName != null) {
+                    final String text = 
TextractProperties.lineBlocksToString(blocks, confidenceThreshold, attributes);
+                    textBuilder.append(text);
+                    if (textBuilder.length() > maxChars) {
+                        textBuilder.setLength(maxChars);
+                    }
+                }
+
+                while (getNextToken(result) != null) {
+                    final String nextToken = getNextToken(result);
+
+                    final GetDocumentTextDetectionRequest nextRequest = new 
GetDocumentTextDetectionRequest();
+                    nextRequest.setJobId(jobId);
+                    nextRequest.setNextToken(nextToken);
+
+                    try {
+                        result = makeRequest(context, flowFile, jobId, 
nextToken).get();
+                    } catch (final ExecutionException ee) {
+                        throw ee.getCause();
+                    }
+
+                    blocks = getBlocks(result);
+                    blockCount += blocks.size();
+                    warnings.addAll(getWarnings(result));
+                    writeAsJson(blocks, out);
+
+                    if (textAttributeName != null) {
+                        final String text = 
TextractProperties.lineBlocksToString(blocks, confidenceThreshold, attributes);
+                        textBuilder.append(text);
+                        if (textBuilder.length() > maxChars) {
+                            textBuilder.setLength(maxChars);
+                        }
+                    }
+                }
+
+                if (textAttributeName != null) {
+                    attributes.put(textAttributeName, textBuilder.toString());
+                }
+            }
+
+            int warningIndex=0;
+            for (final Warning warning : warnings) {
+                final String attributePrefix = "textract.warnings." + 
warningIndex + ".";
+                attributes.put(attributePrefix + "errorCode", 
warning.getErrorCode());
+                attributes.put(attributePrefix + "affected.pages", 
pagesToString(warning.getPages()));
+                warningIndex++;
+            }
+
+            attributes.put("num.text.blocks", String.valueOf(blockCount));
+
+            // On Success, transfer original & created FlowFiles
+            session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_ORIGINAL);
+
+            attributes.put(CoreAttributes.MIME_TYPE.key(), JSON_MIME_TYPE);
+            session.putAllAttributes(createdFlowFile, attributes);
+            session.transfer(createdFlowFile, relationship);
+        }
+
+        private String pagesToString(final List<Integer> pages) {
+            if (pages.isEmpty()) {
+                return "";
+            }
+            if (pages.size() == 1) {
+                return String.valueOf(pages.get(0));
+            }
+
+            final StringBuilder sb = new StringBuilder();
+            int i=0;
+            for (final Integer page : pages) {
+                if (i++ > 0) {
+                    sb.append(", ");
+                }
+
+                sb.append(page);
+            }
+
+            return sb.toString();
+        }
+    }
+
+    private class DetectDocumentResultsFetcher extends 
DetectionAnalysisResultsFetcher<GetDocumentTextDetectionResult> {
+
+        @Override
+        public Future<GetDocumentTextDetectionResult> makeRequest(final 
ProcessContext context, final FlowFile flowFile, final String jobId, final 
String nextToken) {
+            final GetDocumentTextDetectionRequest request = new 
GetDocumentTextDetectionRequest();
+            request.setJobId(jobId);
+            request.setNextToken(nextToken);
+
+            return client.getDocumentTextDetectionAsync(request);

Review Comment:
   It is also true for this processor that null ptr exception is thrown when 
AWSCredentialsProviderControllerService is not set.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/textract/InvokeTextract.java:
##########
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.textract;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractAsync;
+import com.amazonaws.services.textract.AmazonTextractAsyncClient;
+import com.amazonaws.services.textract.AmazonTextractAsyncClientBuilder;
+import com.amazonaws.services.textract.model.AnalyzeDocumentRequest;
+import com.amazonaws.services.textract.model.AnalyzeDocumentResult;
+import com.amazonaws.services.textract.model.AnalyzeExpenseRequest;
+import com.amazonaws.services.textract.model.AnalyzeExpenseResult;
+import com.amazonaws.services.textract.model.AnalyzeIDRequest;
+import com.amazonaws.services.textract.model.AnalyzeIDResult;
+import com.amazonaws.services.textract.model.BadDocumentException;
+import com.amazonaws.services.textract.model.Block;
+import com.amazonaws.services.textract.model.DetectDocumentTextRequest;
+import com.amazonaws.services.textract.model.DetectDocumentTextResult;
+import com.amazonaws.services.textract.model.Document;
+import com.amazonaws.services.textract.model.DocumentMetadata;
+import com.amazonaws.services.textract.model.DocumentTooLargeException;
+import com.amazonaws.services.textract.model.ExpenseDocument;
+import com.amazonaws.services.textract.model.ExpenseField;
+import com.amazonaws.services.textract.model.HumanLoopQuotaExceededException;
+import com.amazonaws.services.textract.model.IdentityDocument;
+import com.amazonaws.services.textract.model.IdentityDocumentField;
+import com.amazonaws.services.textract.model.InternalServerErrorException;
+import com.amazonaws.services.textract.model.InvalidParameterException;
+import com.amazonaws.services.textract.model.InvalidS3ObjectException;
+import com.amazonaws.services.textract.model.LineItemFields;
+import com.amazonaws.services.textract.model.LineItemGroup;
+import 
com.amazonaws.services.textract.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.textract.model.ThrottlingException;
+import com.amazonaws.services.textract.model.UnsupportedDocumentException;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@SideEffectFree
+@CapabilityDescription("Sends the contents of the FlowFile to the AWS Textract 
service in order to perform text-related extraction and analysis on an image. " 
+
+    "Upon successful completion, the original FlowFile will be updated with 
any relevant attributes, such as the raw extracted text. A second FlowFile will 
be " +

Review Comment:
   The documentation of the original relationship says "Upon successful 
analysis, the original FlowFile will be routed to the 'original' relationship 
while the results of the analysis are routed to the 'analysis' relationship". 
I've also tested it and the original FlowFile didn't contain the extracted 
text, but the analyzed one yes. Could you verify that I'm right and update the 
description if needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to