exceptionfactory commented on code in PR #6589: URL: https://github.com/apache/nifi/pull/6589#discussion_r1086202483
########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMachineLearningJobStarter.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml; + +import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE; +import static org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor.TASK_ID; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.regions.Regions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMachineLearningJobStarter<T extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> + extends AbstractAWSCredentialsProviderProcessor<T> { + public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder() + .name("json-payload") + .displayName("JSON Payload") + .description("JSON request for AWS Machine Learning services. The Processor will use FlowFile content for the request when this property is not specified.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .displayName("Region") + .name("aws-region") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + JSON_PAYLOAD, + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + REGION, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE)); + private final static ObjectMapper MAPPER = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null && !context.getProperty(JSON_PAYLOAD).isSet()) { + return; + } + final RESPONSE response; + FlowFile childFlowFile; + try { + response = sendRequest(buildRequest(session, context, flowFile), context, flowFile); + childFlowFile = writeToFlowFile(session, flowFile, response); + postProcessFlowFile(context, session, childFlowFile, response); + session.transfer(childFlowFile, REL_SUCCESS); + } catch (Exception e) { + if (flowFile != null) { + session.transfer(flowFile, REL_FAILURE); + } + getLogger().error("Sending AWS ML Request failed", e); + return; + } + if (flowFile != null) { + session.transfer(flowFile, REL_ORIGINAL); + } + + } + + protected void postProcessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, RESPONSE response) { + flowFile = session.putAttribute(flowFile, TASK_ID.getName(), getAwsTaskId(context, response, flowFile)); + flowFile = session.putAttribute(flowFile, MIME_TYPE.key(), "application/json"); + getLogger().debug("AWS ML task has been started with task id: {}", getAwsTaskId(context, response, flowFile)); Review Comment: ```suggestion final String awsTaskId = getAwsTaskId(context, response, flowFile); flowFile = session.putAttribute(flowFile, TASK_ID.getName(), awsTaskId); flowFile = session.putAttribute(flowFile, MIME_TYPE.key(), "application/json"); getLogger().debug("AWS ML Task [{}] started", awsTaskId); ``` ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import com.amazonaws.services.textract.model.ThrottlingException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Retrieves the current status of an AWS Polly job.") +@SeeAlso({StartAwsPollyJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "PollyS3OutputBucket", description = "The bucket name where polly output will be located."), + @WritesAttribute(attribute = "PollyS3OutputKey", description = "Object key of polly output."), + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonPollyClient> { + private static final String BUCKET = "bucket"; + private static final String KEY = "key"; + private static final Pattern S3_PATH = Pattern.compile("https://s3.*amazonaws.com/(?<" + BUCKET + ">[^/]+)/(?<" + KEY + ">.*)"); + private static final String AWS_S3_BUCKET = "PollyS3OutputBucket"; + private static final String AWS_S3_KEY = "filename"; + + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonPollyClient) AmazonPollyClientBuilder.standard() + .withCredentials(credentialsProvider) + .withRegion(context.getProperty(REGION).getValue()) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + GetSpeechSynthesisTaskResult speechSynthesisTask; + try { + speechSynthesisTask = getSynthesisTask(context, flowFile); + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Polly Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + + TaskStatus taskStatus = TaskStatus.fromValue(speechSynthesisTask.getSynthesisTask().getTaskStatus()); + + if (taskStatus == TaskStatus.InProgress || taskStatus == TaskStatus.Scheduled) { + session.penalize(flowFile); + session.transfer(flowFile, REL_RUNNING); + } else if (taskStatus == TaskStatus.Completed) { + String outputUri = speechSynthesisTask.getSynthesisTask().getOutputUri(); + + Matcher matcher = S3_PATH.matcher(outputUri); + if (matcher.find()) { + session.putAttribute(flowFile, AWS_S3_BUCKET, matcher.group(BUCKET)); + session.putAttribute(flowFile, AWS_S3_KEY, matcher.group(KEY)); + } + FlowFile childFlowFile = session.create(flowFile); + writeToFlowFile(session, childFlowFile, speechSynthesisTask); + childFlowFile = session.putAttribute(childFlowFile, AWS_TASK_OUTPUT_LOCATION, outputUri); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(childFlowFile, REL_SUCCESS); + getLogger().info("Amazon Polly reported that the task completed for {}", flowFile); Review Comment: ```suggestion getLogger().info("Amazon Polly Task Completed {}", flowFile); ``` ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/GetAwsPollyJobStatus.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import com.amazonaws.services.textract.model.ThrottlingException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Retrieves the current status of an AWS Polly job.") +@SeeAlso({StartAwsPollyJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "PollyS3OutputBucket", description = "The bucket name where polly output will be located."), + @WritesAttribute(attribute = "PollyS3OutputKey", description = "Object key of polly output."), + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsPollyJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonPollyClient> { + private static final String BUCKET = "bucket"; + private static final String KEY = "key"; + private static final Pattern S3_PATH = Pattern.compile("https://s3.*amazonaws.com/(?<" + BUCKET + ">[^/]+)/(?<" + KEY + ">.*)"); + private static final String AWS_S3_BUCKET = "PollyS3OutputBucket"; + private static final String AWS_S3_KEY = "filename"; + + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonPollyClient) AmazonPollyClientBuilder.standard() + .withCredentials(credentialsProvider) + .withRegion(context.getProperty(REGION).getValue()) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + GetSpeechSynthesisTaskResult speechSynthesisTask; + try { + speechSynthesisTask = getSynthesisTask(context, flowFile); + } catch (ThrottlingException e) { + getLogger().info("Request Rate Limit exceeded", e); + session.transfer(flowFile, REL_THROTTLED); + return; + } catch (Exception e) { + getLogger().warn("Failed to get Polly Job status", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + + TaskStatus taskStatus = TaskStatus.fromValue(speechSynthesisTask.getSynthesisTask().getTaskStatus()); + + if (taskStatus == TaskStatus.InProgress || taskStatus == TaskStatus.Scheduled) { + session.penalize(flowFile); + session.transfer(flowFile, REL_RUNNING); + } else if (taskStatus == TaskStatus.Completed) { + String outputUri = speechSynthesisTask.getSynthesisTask().getOutputUri(); + + Matcher matcher = S3_PATH.matcher(outputUri); + if (matcher.find()) { + session.putAttribute(flowFile, AWS_S3_BUCKET, matcher.group(BUCKET)); + session.putAttribute(flowFile, AWS_S3_KEY, matcher.group(KEY)); + } + FlowFile childFlowFile = session.create(flowFile); + writeToFlowFile(session, childFlowFile, speechSynthesisTask); + childFlowFile = session.putAttribute(childFlowFile, AWS_TASK_OUTPUT_LOCATION, outputUri); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(childFlowFile, REL_SUCCESS); + getLogger().info("Amazon Polly reported that the task completed for {}", flowFile); + } else if (taskStatus == TaskStatus.Failed) { + final String failureReason = speechSynthesisTask.getSynthesisTask().getTaskStatusReason(); + flowFile = session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason); + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Amazon Polly reported that the task failed for {}: {}", flowFile, failureReason); Review Comment: ```suggestion getLogger().error("Amazon Polly Task Failed {} Reason [{}]", flowFile, failureReason); ``` ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractType.DOCUMENT_ANALYSIS; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.GetDocumentAnalysisRequest; +import com.amazonaws.services.textract.model.GetDocumentTextDetectionRequest; +import com.amazonaws.services.textract.model.GetExpenseAnalysisRequest; +import com.amazonaws.services.textract.model.JobStatus; +import com.amazonaws.services.textract.model.ThrottlingException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Retrieves the current status of an AWS Textract job.") +@SeeAlso({StartAwsTextractJob.class}) +public class GetAwsTextractJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonTextractClient> { + public static final PropertyDescriptor TEXTRACT_TYPE = new PropertyDescriptor.Builder() + .name("textract-type") + .displayName("Textract Type") + .required(true) + .description("Supported values: \"Document Analysis\", \"Document Text Detection\", \"Expense Analysis\"") + .allowableValues(TextractType.TEXTRACT_TYPES) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue(DOCUMENT_ANALYSIS.getType()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> TEXTRACT_PROPERTIES = + Collections.unmodifiableList(Stream.concat(PROPERTIES.stream(), Stream.of(TEXTRACT_TYPE)).collect(Collectors.toList())); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return TEXTRACT_PROPERTIES; + } + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTextractClient) AmazonTextractClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + String textractType = context.getProperty(TEXTRACT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); + + String awsTaskId = context.getProperty(TASK_ID).evaluateAttributeExpressions(flowFile).getValue(); + try { + JobStatus jobStatus = getTaskStatus(TextractType.fromString(textractType), getClient(), awsTaskId); + if (JobStatus.SUCCEEDED == jobStatus) { + Object task = getTask(TextractType.fromString(textractType), getClient(), awsTaskId); + writeToFlowFile(session, flowFile, task); + session.transfer(flowFile, REL_SUCCESS); + } else if (JobStatus.IN_PROGRESS == jobStatus) { + session.transfer(flowFile, REL_RUNNING); + } else if (JobStatus.PARTIAL_SUCCESS == jobStatus) { + session.transfer(flowFile, REL_THROTTLED); + } else if (JobStatus.FAILED == jobStatus) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Amazon Textract reported that the task failed for awsTaskId: {}", awsTaskId); Review Comment: ```suggestion getLogger().error("Amazon Textract Task [{}] Failed", awsTaskId); ``` ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/transcribe/GetAwsTranscribeJobStatus.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.ml.transcribe; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.model.ThrottlingException; +import com.amazonaws.services.transcribe.AmazonTranscribeClient; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobRequest; +import com.amazonaws.services.transcribe.model.GetTranscriptionJobResult; +import com.amazonaws.services.transcribe.model.TranscriptionJobStatus; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMachineLearningJobStatusProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Transcribe"}) +@CapabilityDescription("Retrieves the current status of an AWS Transcribe job.") +@SeeAlso({StartAwsTranscribeJob.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "outputLocation", description = "S3 path-style output location of the result.") +}) +public class GetAwsTranscribeJobStatus extends AwsMachineLearningJobStatusProcessor<AmazonTranscribeClient> { + @Override + protected AmazonTranscribeClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTranscribeClient) AmazonTranscribeClient.builder() + .withRegion(context.getProperty(REGION).getValue()) + .withCredentials(credentialsProvider) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + try { + GetTranscriptionJobResult job = getJob(context, flowFile); + TranscriptionJobStatus jobStatus = TranscriptionJobStatus.fromValue(job.getTranscriptionJob().getTranscriptionJobStatus()); + + if (TranscriptionJobStatus.COMPLETED == jobStatus) { + writeToFlowFile(session, flowFile, job); + session.putAttribute(flowFile, AWS_TASK_OUTPUT_LOCATION, job.getTranscriptionJob().getTranscript().getTranscriptFileUri()); + session.transfer(flowFile, REL_SUCCESS); + } else if (TranscriptionJobStatus.IN_PROGRESS == jobStatus) { + session.transfer(flowFile, REL_RUNNING); + } else if (TranscriptionJobStatus.FAILED == jobStatus) { + final String failureReason = job.getTranscriptionJob().getFailureReason(); + session.putAttribute(flowFile, FAILURE_REASON_ATTRIBUTE, failureReason); + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Transcribe Task Failed for {}: {}", flowFile, failureReason); Review Comment: ```suggestion getLogger().error("Transcribe Task Failed {} Reason [{}]", flowFile, failureReason); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
