This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 67925b171a NIFI-10953: Implement GCP Vision AI processors
67925b171a is described below

commit 67925b171aa4c629fca7c3771a43779425bfd8b2
Author: Kalman Jantner <[email protected]>
AuthorDate: Tue Dec 6 16:56:29 2022 +0100

    NIFI-10953: Implement GCP Vision AI processors
    
    This closes #6762.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |  22 ++++
 .../gcp/vision/AbstractGcpVisionProcessor.java     |  85 +++++++++++++++
 ...bstractGetGcpVisionAnnotateOperationStatus.java | 114 +++++++++++++++++++
 .../vision/AbstractStartGcpVisionOperation.java    |  81 ++++++++++++++
 .../GetGcpVisionAnnotateFilesOperationStatus.java  |  41 +++++++
 .../GetGcpVisionAnnotateImagesOperationStatus.java |  41 +++++++
 .../StartGcpVisionAnnotateFilesOperation.java      |  94 ++++++++++++++++
 .../StartGcpVisionAnnotateImagesOperation.java     |  94 ++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   6 +-
 .../additionalDetails.html                         |  34 ++++++
 .../additionalDetails.html                         |  34 ++++++
 .../additionalDetails.html                         | 104 ++++++++++++++++++
 .../vision-annotate-files.png                      | Bin 0 -> 297707 bytes
 .../additionalDetails.html                         | 105 ++++++++++++++++++
 .../vision-annotate-images.png                     | Bin 0 -> 291842 bytes
 ...tGcpVisionAnnotateFilesOperationStatusTest.java | 120 ++++++++++++++++++++
 ...GcpVisionAnnotateImagesOperationStatusTest.java | 121 +++++++++++++++++++++
 .../StartGcpVisionAnnotateFilesOperationTest.java  | 106 ++++++++++++++++++
 .../StartGcpVisionAnnotateImagesOperationTest.java | 100 +++++++++++++++++
 .../src/test/resources/vision/annotate-file.json   |  21 ++++
 .../src/test/resources/vision/annotate-image.json  |  19 ++++
 21 files changed, 1341 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index a70a98e046..8c5871ebf8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -187,6 +187,28 @@
             <artifactId>jackson-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-vision</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk15on</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcpkix-jdk15on</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
new file mode 100644
index 0000000000..4530ac3f8c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.cloud.vision.v1.ImageAnnotatorSettings;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public abstract class AbstractGcpVisionProcessor extends AbstractProcessor  {
+    public static final String GCP_OPERATION_KEY = "operationKey";
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success 
relationship").build();
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure 
relationship").build();
+
+    protected static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+    protected static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays.asList(
+            GCP_CREDENTIALS_PROVIDER_SERVICE)
+    );
+
+    private ImageAnnotatorClient vision;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        final GCPCredentialsService gcpCredentialsService =
+                
context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
+        try {
+            GoogleCredentials credentials = 
gcpCredentialsService.getGoogleCredentials();
+            FixedCredentialsProvider credentialsProvider = 
FixedCredentialsProvider.create(credentials);
+            ImageAnnotatorSettings.Builder builder = 
ImageAnnotatorSettings.newBuilder().setCredentialsProvider(credentialsProvider);
+            vision = ImageAnnotatorClient.create(builder.build());
+        } catch (Exception e) {
+            getLogger().error("Failed to create vision client.", e);
+            throw new ProcessException("Failed to create vision client.", e);
+        }
+    }
+
+    protected ImageAnnotatorClient getVisionClient() {
+        return this.vision;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
new file mode 100644
index 0000000000..0a8db05d75
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
+import com.google.longrunning.Operation;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import com.google.rpc.Status;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+abstract public class AbstractGetGcpVisionAnnotateOperationStatus extends 
AbstractGcpVisionProcessor {
+    public static final PropertyDescriptor OPERATION_KEY = new 
PropertyDescriptor.Builder()
+            .name("operationKey")
+            .displayName("GCP Operation Key")
+            .description("The unique identifier of the Vision operation.")
+            .defaultValue("${operationKey}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final Relationship REL_RUNNING = new Relationship.Builder()
+            .name("running")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES =
+            Collections.unmodifiableList(Stream.concat(properties.stream(), 
Stream.of(OPERATION_KEY)).collect(Collectors.toList()));
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_RUNNING
+    )));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        try {
+            String operationKey = 
context.getProperty(OPERATION_KEY).evaluateAttributeExpressions(flowFile).getValue();;
+            Operation operation = 
getVisionClient().getOperationsClient().getOperation(operationKey);
+            getLogger().info(operation.toString());
+            if (operation.getDone() && !operation.hasError()) {
+                GeneratedMessageV3 response = 
deserializeResponse(operation.getResponse().getValue());
+                FlowFile childFlowFile = session.create(flowFile);
+                session.write(childFlowFile, out -> 
out.write(JsonFormat.printer().print(response).getBytes(StandardCharsets.UTF_8)));
+                session.putAttribute(childFlowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+                session.transfer(flowFile, REL_ORIGINAL);
+                session.transfer(childFlowFile, REL_SUCCESS);
+            } else if (!operation.getDone()) {
+                session.transfer(flowFile, REL_RUNNING);
+            } else {
+                Status error = operation.getError();
+                getLogger().error("Failed to execute vision operation. Error 
code: {}, Error message: {}", error.getCode(), error.getMessage());
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        } catch (Exception e) {
+            getLogger().error("Fail to get GCP Vision operation's status", e);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    abstract protected GeneratedMessageV3 deserializeResponse(ByteString 
responseValue) throws InvalidProtocolBufferException;
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
new file mode 100644
index 0000000000..373d933b7e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.protobuf.util.JsonFormat;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public abstract class AbstractStartGcpVisionOperation<B extends 
com.google.protobuf.GeneratedMessageV3.Builder<B>> extends 
AbstractGcpVisionProcessor  {
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null && 
!context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()) {
+            return;
+        } else if (flowFile == null) {
+            flowFile = session.create();
+        }
+        try  {
+            OperationFuture<?, ?> asyncResponse = startOperation(session, 
context, flowFile);
+            String operationName = asyncResponse.getName();
+            session.putAttribute(flowFile, GCP_OPERATION_KEY, operationName);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("Fail to start GCP Vision operation", e);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    @OnStopped
+    public void onStopped() throws IOException {
+        getVisionClient().close();
+    }
+
+    protected OperationFuture<?, ?> startOperation(ProcessSession session, 
ProcessContext context, FlowFile flowFile) {
+        B builder = newBuilder();
+        InputStream inStream = 
context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()
+                ? getInputStreamFromProperty(context, flowFile) : 
session.read(flowFile);
+        try (InputStream inputStream = inStream) {
+            JsonFormat.parser().ignoringUnknownFields().merge(new 
InputStreamReader(inputStream), builder);
+        } catch (final IOException e) {
+            throw new ProcessException("Read FlowFile Failed", e);
+        }
+        return startOperation(builder);
+    }
+
+    private InputStream getInputStreamFromProperty(ProcessContext context, 
FlowFile flowFile) {
+        return new 
ByteArrayInputStream(context.getProperty(getJsonPayloadPropertyDescriptor()).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8));
+    }
+
+    abstract B newBuilder();
+
+    abstract OperationFuture<?, ?> startOperation(B builder);
+
+    abstract PropertyDescriptor getJsonPayloadPropertyDescriptor();
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
new file mode 100644
index 0000000000..ab51f012ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.cloud.vision.v1p2beta1.AsyncBatchAnnotateFilesResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+
+@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
+@CapabilityDescription("Retrieves the current status of an Google Vision 
operation.")
+@SeeAlso({StartGcpVisionAnnotateFilesOperation.class})
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "operationKey", description = "A unique 
identifier of the operation designated by the Vision server.")
+})
+public class GetGcpVisionAnnotateFilesOperationStatus extends 
AbstractGetGcpVisionAnnotateOperationStatus {
+    @Override
+    protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) 
throws InvalidProtocolBufferException {
+        return AsyncBatchAnnotateFilesResponse.parseFrom(responseValue);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
new file mode 100644
index 0000000000..75d657b05f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+
+@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
+@CapabilityDescription("Retrieves the current status of an Google Vision 
operation.")
+@SeeAlso({StartGcpVisionAnnotateImagesOperation.class})
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "operationKey", description = "A unique 
identifier of the operation designated by the Vision server.")
+})
+public class GetGcpVisionAnnotateImagesOperationStatus extends 
AbstractGetGcpVisionAnnotateOperationStatus {
+    @Override
+    protected GeneratedMessageV3 deserializeResponse(ByteString responseValue) 
throws InvalidProtocolBufferException {
+        return AsyncBatchAnnotateImagesResponse.parseFrom(responseValue);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
new file mode 100644
index 0000000000..5c7014f722
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
+@CapabilityDescription("Trigger a Vision operation on file input. It should be 
followed by GetGcpVisionAnnotateFilesOperationStatus processor in order to 
monitor operation status.")
+@SeeAlso({GetGcpVisionAnnotateFilesOperationStatus.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = "operationKey", description = "A unique 
identifier of the operation returned by the Vision server.")
+})
+public class StartGcpVisionAnnotateFilesOperation extends 
AbstractStartGcpVisionOperation<AsyncBatchAnnotateFilesRequest.Builder> {
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON request for AWS Machine Learning services. The 
Processor will use FlowFile content for the request when this property is not 
specified.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .defaultValue("{\n" +
+                    "    \"requests\": [\n" +
+                    "        {\n" +
+                    "            \"inputConfig\": {\n" +
+                    "                \"gcsSource\": {\n" +
+                    "                    \"uri\": 
\"gs://${gcs.bucket}/${filename}\"\n" +
+                    "                },\n" +
+                    "                \"mimeType\": \"application/pdf\"\n" +
+                    "            },\n" +
+                    "            \"features\": [{\n" +
+                    "                    \"type\": 
\"DOCUMENT_TEXT_DETECTION\",\n" +
+                    "                    \"maxResults\": 4\n" +
+                    "                }],\n" +
+                    "            \"outputConfig\": {\n" +
+                    "                \"gcsDestination\": {\n" +
+                    "                    \"uri\": 
\"gs://${gcs.bucket}/${filename}/\"\n" +
+                    "                },\n" +
+                    "                \"batchSize\": 2\n" +
+                    "            }\n" +
+                    "        }]\n" +
+                    "}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    AsyncBatchAnnotateFilesRequest.Builder newBuilder() {
+        return AsyncBatchAnnotateFilesRequest.newBuilder();
+    }
+
+    @Override
+    OperationFuture<?, ?> 
startOperation(AsyncBatchAnnotateFilesRequest.Builder builder) {
+        return getVisionClient().asyncBatchAnnotateFilesAsync(builder.build());
+    }
+
+    @Override
+    PropertyDescriptor getJsonPayloadPropertyDescriptor() {
+        return JSON_PAYLOAD;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
new file mode 100644
index 0000000000..6d078ec69b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesRequest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
+@CapabilityDescription("Trigger a Vision operation on image input. It should 
be followed by GetGcpVisionAnnotateImagesOperationStatus processor in order to 
monitor operation status.")
+@SeeAlso({GetGcpVisionAnnotateImagesOperationStatus.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = "operationKey", description = "A unique 
identifier of the operation returned by the Vision server.")
+})
+public class StartGcpVisionAnnotateImagesOperation extends 
AbstractStartGcpVisionOperation<AsyncBatchAnnotateImagesRequest.Builder> {
+    static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON request for AWS Machine Learning services. The 
Processor will use FlowFile content for the request when this property is not 
specified.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("{\n" +
+                    "    \"requests\": [{\n" +
+                    "        \"image\": {\n" +
+                    "            \"source\": {\n" +
+                    "                \"imageUri\": 
\"gs://${gcs.bucket}/${filename}\"\n" +
+                    "            }\n" +
+                    "        },\n" +
+                    "        \"features\": [{\n" +
+                    "            \"type\": \"FACE_DETECTION\",\n" +
+                    "            \"maxResults\": 4\n" +
+                    "        }]\n" +
+                    "    }],\n" +
+                    "    \"outputConfig\": {\n" +
+                    "        \"gcsDestination\": {\n" +
+                    "            \"uri\": 
\"gs://${gcs.bucket}/${filename}/\"\n" +
+                    "        },\n" +
+                    "        \"batchSize\": 2\n" +
+                    "    }\n" +
+                    "}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    AsyncBatchAnnotateImagesRequest.Builder newBuilder() {
+        return AsyncBatchAnnotateImagesRequest.newBuilder();
+    }
+
+    @Override
+    OperationFuture<?, ?> 
startOperation(AsyncBatchAnnotateImagesRequest.Builder builder) {
+        return 
getVisionClient().asyncBatchAnnotateImagesAsync(builder.build());
+    }
+
+    @Override
+    PropertyDescriptor getJsonPayloadPropertyDescriptor() {
+        return JSON_PAYLOAD;
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e0edce5fd3..99a4df40a3 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -24,4 +24,8 @@ org.apache.nifi.processors.gcp.bigquery.PutBigQuery
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
 org.apache.nifi.processors.gcp.drive.ListGoogleDrive
-org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
\ No newline at end of file
+org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
+org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation
+org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation
+org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus
+org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
new file mode 100644
index 0000000000..f3bd5c178c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>Google Vision</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Get Annotate Files Status</h1>
+
+<h3>Usage</h3>
+<p>
+    GetGcpVisionAnnotateFilesOperationStatus is designed to periodically check 
the statuses of file annotation operations. This processor should be used in 
pair with StartGcpVisionAnnotateFilesOperation Processor.
+    An outgoing FlowFile contains the raw response returned by the Vision 
server. This response is in JSON json format and contains a google storage 
reference where the result is located, as well as additional metadata, as 
written in the <a 
href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation";
 target="_blank">Google Vision API Reference document</a>.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
new file mode 100644
index 0000000000..1f2654b009
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>Google Vision</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Get Annotate Images Status</h1>
+
+<h3>Usage</h3>
+<p>
+    GetGcpVisionAnnotateImagesOperationStatus is designed to periodically 
check the statuses of image annotation operations. This processor should be 
used in pair with StartGcpVisionAnnotateImagesOperation Processor.
+    An outgoing FlowFile contains the raw response returned by the Vision 
server. This response is in JSON json format and contains a google storage 
reference where the result is located, as well as additional metadata, as 
written in the <a 
href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation";
 target="_blank">Google Vision API Reference document</a>.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
new file mode 100644
index 0000000000..fe99aefff2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
@@ -0,0 +1,104 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>Google Vision</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Start Annotate Files Operation</h1>
+<p>
+    Prerequisites
+<ul>
+    <li>Make sure Vision API is enabled and the account you are using has the 
right to use it</li>
+    <li>Make sure the input file(s) are available in a GCS bucket</li>
+</ul>
+</p>
+<h3>Usage</h3>
+<p>
+    StartGcpVisionAnnotateFilesOperation is designed to trigger file 
annotation operations. This processor should be used in pair with the 
GetGcpVisionAnnotateFilesOperationStatus Processor.
+    Outgoing FlowFiles contain the raw response to the request returned by the 
Vision server. The response is in JSON format and contains the result and 
additional metadata as written in the Google Vision API Reference documents.
+</p>
+
+<h3>Payload</h3>
+<p>
+    The JSON Payload is a request in JSON format as documented in the <a 
href="https://cloud.google.com/vision/docs/reference/rest/v1/files/asyncBatchAnnotate";
 target="_blank">Google Vision REST API reference document</a>.
+    Payload can be fed to the processor via the <code>JSON Payload</code> 
property or as a FlowFile content. The property has higher precedence over 
FlowFile content.
+    Please make sure to delete the default value of the property if you want 
to use FlowFile content payload.
+    A JSON payload template example:
+</p>
+<code>
+    <pre>
+{
+    "requests": [
+        {
+            "inputConfig": {
+                "gcsSource": {
+                    "uri": "gs://${gcs.bucket}/${filename}"
+                },
+                "mimeType": "application/pdf"
+            },
+            "features": [{
+                    "type": "DOCUMENT_TEXT_DETECTION",
+                    "maxResults": 4
+                }],
+            "outputConfig": {
+                "gcsDestination": {
+                    "uri": "gs://${gcs.bucket}/${filename}/"
+                },
+                "batchSize": 2
+            }
+        }]
+}
+    </pre>
+</code>
+<h3>Features types</h3>
+<ul>
+    <li>TEXT_DETECTION: Optical character recognition (OCR) for an image; text 
recognition and conversion to machine-coded text. Identifies and extracts UTF-8 
text in an image.</li>
+    <li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a 
file (PDF/TIFF) or dense text image; dense text recognition and conversion to 
machine-coded text.</li>
+</ul>
+You can find more details at <a 
href="https://cloud.google.com/vision/docs/features-list"; 
target="_blank">Google Vision Feature List</a>
+
+<h3>Example: How to setup a simple Annotate Image Flow</h3>
+<p>
+    Prerequisites
+</p>
+<p>
+<ul>
+    <li>Input files should be available in a GCS bucket</li>
+    <li>This bucket must not contain anything else but the input files</li>
+</ul>
+</p>
+<p>Create the following flow</p>
+<img src="vision-annotate-files.png" style="height: 50%; width: 50%"/>
+<p>
+    Keep the default value of JSON PAYLOAD property in 
StartGcpVisionAnnotateImagesOperation
+</p>
+<p>
+    Execution steps:
+<ul>
+    <li>ListGCSBucket processor will return a list of files in the bucket at 
the first run.</li>
+    <li>ListGCSBucket will return only new items at subsequent runs.</li>
+    <li>StartGcpVisionAnnotateFilesOperation processor will trigger GCP Vision 
file annotation jobs based on the JSON payload.</li>
+    <li>StartGcpVisionAnnotateFilesOperation processor will populate the 
<code>operationKey</code> flow file attribute.</li>
+    <li>GetGcpVisionAnnotateFilesOperationStatus processor will periodically 
query status of the job.</li>
+</ul>
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
new file mode 100644
index 0000000000..65635d2bfc
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
 differ
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
new file mode 100644
index 0000000000..e15796fcb0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
@@ -0,0 +1,105 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+       <meta charset="utf-8"/>
+       <title>Google Vision</title>
+       <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Start Annotate Images Operation</h1>
+<p>
+       Prerequisites
+<ul>
+       <li>Make sure Vision API is enabled and the account you are using has 
the right to use it</li>
+       <li>Make sure thne input image(s) are available in a GCS bucket</li>
+</ul>
+</p>
+<h3>Usage</h3>
+<p>
+       StartGcpVisionAnnotateImagesOperation is designed to trigger image 
annotation operations. This processor should be used in pair with the 
GetGcpVisionAnnotateImagesOperationStatus Processor.
+       Outgoing FlowFiles contain the raw response to the request returned by 
the Vision server. The response is in JSON format and contains the result and 
additional metadata as written in the Google Vision API Reference documents.
+</p>
+<h3>Payload</h3>
+<p>
+       The JSON Payload is a request in JSON format as documented in the <a 
href="https://cloud.google.com/vision/docs/reference/rest/v1/images/asyncBatchAnnotate";
 target="_blank">Google Vision REST API reference document</a>.
+       Payload can be fed to the processor via the <code>JSON Payload</code> 
property or as a FlowFile content. The property has higher precedence over 
FlowFile content.
+       Please make sure to delete the default value of the property if you 
want to use FlowFile content payload.
+       A JSON payload template example:
+</p>
+
+<code>
+    <pre>
+{
+       "requests": [{
+               "image": {
+                       "source": {
+                               "imageUri": "gs://${gcs.bucket}/${filename}"
+                       }
+               },
+               "features": [{
+                       "type": "DOCUMENT_TEXT_DETECTION",
+                       "maxResults": 4
+               }]
+       }],
+       "outputConfig": {
+               "gcsDestination": {
+                       "uri": "gs://${gcs.bucket}/${filename}/"
+               },
+               "batchSize": 2
+       }
+}
+    </pre>
+</code>
+<h3>Features types</h3>
+<ul>
+       <li>TEXT_DETECTION: Optical character recognition (OCR) for an image; 
text recognition and conversion to machine-coded text. Identifies and extracts 
UTF-8 text in an image.</li>
+       <li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a 
file (PDF/TIFF) or dense text image; dense text recognition and conversion to 
machine-coded text.</li>
+       <li>LANDMARK_DETECTION: Provides the name of the landmark, a confidence 
score and a bounding box in the image for the landmark.</li>
+       <li>LOGO_DETECTION: Provides a textual description of the entity 
identified, a confidence score, and a bounding polygon for the logo in the 
file.</li>
+       <li>LABEL_DETECTION: Provides generalized labels for an image.</li>
+       <li>etc.</li>
+</ul>
+You can find more details at <a 
href="https://cloud.google.com/vision/docs/features-list"; 
target="_blank">Google Vision Feature List</a>
+<h3>Example: How to setup a simple Annotate Image Flow</h3>
+<p>
+       Prerequisites
+</p>
+<p>
+<ul>
+       <li>Input image files should be available in a GCS bucket</li>
+       <li>This bucket must not contain anything else but the input image 
files</li>
+</ul>
+</p>
+<p>Create the following flow</p>
+<img src="vision-annotate-images.png" style="height: 50%; width: 50%"/>
+<p>
+       Keep the default value of JSON PAYLOAD property in 
StartGcpVisionAnnotateImagesOperation
+</p>
+<p>
+       Execution steps:
+<ul>
+       <li>ListGCSBucket processor will return a list of files in the bucket 
at the first run.</li>
+       <li>ListGCSBucket will return only new items at subsequent runs.</li>
+       <li>StartGcpVisionAnnotateImagesOperation processor will trigger GCP 
Vision image annotation jobs based on the JSON payload.</li>
+       <li>StartGcpVisionAnnotateImagesOperation processor will populate the 
<code>operationKey</code> flow file attribute.</li>
+       <li>GetGcpVisionAnnotateImagesOperationStatus processor will 
periodically query status of the job.</li>
+</ul>
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
new file mode 100644
index 0000000000..e4ed82c406
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
 differ
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
new file mode 100644
index 0000000000..b64d441d1c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.longrunning.Operation;
+import com.google.longrunning.OperationsClient;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.rpc.Status;
+import java.util.Collections;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetGcpVisionAnnotateFilesOperationStatusTest {
+    private static final String PLACEHOLDER_CONTENT = "content";
+    private static final String OPERATION_KEY = "operationKey";
+    private TestRunner runner = null;
+    private GetGcpVisionAnnotateFilesOperationStatus processor;
+    @Mock
+    private ImageAnnotatorClient mockVisionClient;
+    private GCPCredentialsService gcpCredentialsService;
+    @Mock
+    private OperationsClient operationClient;
+    @Mock
+    private Operation operation;
+
+    @BeforeEach
+    public void setUp() throws InitializationException {
+        gcpCredentialsService = new GCPCredentialsControllerService();
+        processor = new GetGcpVisionAnnotateFilesOperationStatus() {
+            @Override
+            protected ImageAnnotatorClient getVisionClient() {
+                return mockVisionClient;
+            }
+
+            @Override
+            protected GeneratedMessageV3 deserializeResponse(ByteString 
responseValue) {
+                return AsyncBatchAnnotateFilesResponse.newBuilder().build();
+            }
+        };
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcp-credentials-provider-service-id", 
gcpCredentialsService);
+        runner.enableControllerService(gcpCredentialsService);
+        runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp-credentials-provider-service-id");
+        runner.assertValid(gcpCredentialsService);
+    }
+
+    @Test
+    public void testGetAnnotateFilesJobStatusSuccess() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getResponse()).thenReturn(Any.newBuilder().build());
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(false);
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertTransferCount(REL_SUCCESS, 1);
+        runner.assertTransferCount(REL_ORIGINAL, 1);
+    }
+
+    @Test
+    public void testGetAnnotateFilesJobStatusInProgress() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(true);
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testGetAnnotateImagesJobStatusFailed() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(true);
+        when(operation.getError()).thenReturn(Status.newBuilder().build());
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
new file mode 100644
index 0000000000..d4576ca1d4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.longrunning.Operation;
+import com.google.longrunning.OperationsClient;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Status;
+import java.util.Collections;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetGcpVisionAnnotateImagesOperationStatusTest {
+    private static final String PLACEHOLDER_CONTENT = "content";
+    private static final String OPERATION_KEY = "operationKey";
+    private TestRunner runner = null;
+    private GetGcpVisionAnnotateImagesOperationStatus processor;
+    @Mock
+    private ImageAnnotatorClient mockVisionClient;
+    private GCPCredentialsService gcpCredentialsService;
+    @Mock
+    private OperationsClient operationClient;
+    @Mock
+    private Operation operation;
+
+    @BeforeEach
+    public void setUp() throws InitializationException {
+        gcpCredentialsService = new GCPCredentialsControllerService();
+        processor = new GetGcpVisionAnnotateImagesOperationStatus() {
+            @Override
+            protected ImageAnnotatorClient getVisionClient() {
+                return mockVisionClient;
+            }
+
+            @Override
+            protected GeneratedMessageV3 deserializeResponse(ByteString 
responseValue) throws InvalidProtocolBufferException {
+                return AsyncBatchAnnotateImagesResponse.newBuilder().build();
+            }
+        };
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcp-credentials-provider-service-id", 
gcpCredentialsService);
+        runner.enableControllerService(gcpCredentialsService);
+        runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp-credentials-provider-service-id");
+        runner.assertValid(gcpCredentialsService);
+    }
+
+    @Test
+    public void testGetAnnotateImagesJobStatusSuccess() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getResponse()).thenReturn(Any.newBuilder().build());
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(false);
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertTransferCount(REL_SUCCESS, 1);
+        runner.assertTransferCount(REL_ORIGINAL, 1);
+    }
+
+    @Test
+    public void testGetAnnotateImagesJobStatusInProgress() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(true);
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testGetAnnotateImagesJobStatusFailed() {
+        
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+        
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+        when(operation.getDone()).thenReturn(true);
+        when(operation.hasError()).thenReturn(true);
+        when(operation.getError()).thenReturn(Status.newBuilder().build());
+        runner.enqueue(PLACEHOLDER_CONTENT, 
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
new file mode 100644
index 0000000000..4f5ccb9a56
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.cloud.vision.v1.OperationMetadata;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class StartGcpVisionAnnotateFilesOperationTest {
+    private TestRunner runner = null;
+    private StartGcpVisionAnnotateFilesOperation processor;
+    private static final Path FlowFileContent = 
Paths.get("src/test/resources/vision/annotate-image.json");
+    @Mock
+    private ImageAnnotatorClient vision;
+    @Captor
+    private ArgumentCaptor<AsyncBatchAnnotateFilesRequest> requestCaptor;
+    private String operationName = "operationName";
+    @Mock
+    private OperationFuture<AsyncBatchAnnotateFilesResponse, 
OperationMetadata> operationFuture;
+    @Mock
+    private ApiFuture<OperationSnapshot> apiFuture;
+    @Mock
+    private ImageAnnotatorClient mockVisionClient;
+    private GCPCredentialsService gcpCredentialsService;
+    @Mock
+    private OperationSnapshot operationSnapshot;
+
+    @BeforeEach
+    public void setUp() throws InitializationException {
+        gcpCredentialsService = new GCPCredentialsControllerService();
+        processor = new StartGcpVisionAnnotateFilesOperation() {
+            @Override
+            protected ImageAnnotatorClient getVisionClient() {
+                return mockVisionClient;
+            }
+        };
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcp-credentials-provider-service-id", 
gcpCredentialsService);
+        runner.enableControllerService(gcpCredentialsService);
+        runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp-credentials-provider-service-id");
+        runner.assertValid(gcpCredentialsService);
+    }
+
+    @Test
+    public void testAnnotateFilesJob() throws ExecutionException, 
InterruptedException, IOException {
+        
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest)
 any())).thenReturn(operationFuture);
+        when(operationFuture.getName()).thenReturn(operationName);
+        runner.enqueue(FlowFileContent, Collections.emptyMap());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, 
GCP_OPERATION_KEY);
+    }
+
+    @Test
+    public void testAnnotateFilesJobFail() throws IOException {
+        
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest)any())).thenThrow(new
 RuntimeException("ServiceError"));
+        runner.enqueue(FlowFileContent, Collections.emptyMap());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_FAILURE);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
new file mode 100644
index 0000000000..9ce1eac470
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static 
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static 
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation.JSON_PAYLOAD;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class StartGcpVisionAnnotateImagesOperationTest {
+    private TestRunner runner = null;
+    private StartGcpVisionAnnotateImagesOperation processor;
+    private String operationName = "operationName";
+    @Mock
+    private OperationFuture operationFuture;
+    @Mock
+    private ApiFuture<OperationSnapshot> apiFuture;
+    @Mock
+    private ImageAnnotatorClient mockVisionClient;
+    private GCPCredentialsService gcpCredentialsService;
+    @Mock
+    private OperationSnapshot operationSnapshot;
+    private String jsonPayloadValue;
+
+    @BeforeEach
+    public void setUp() throws InitializationException, IOException {
+        jsonPayloadValue = FileUtils.readFileToString(new 
File("src/test/resources/vision/annotate-image.json"), "UTF-8");
+        gcpCredentialsService = new GCPCredentialsControllerService();
+        processor = new StartGcpVisionAnnotateImagesOperation() {
+            @Override
+            protected ImageAnnotatorClient getVisionClient() {
+                return mockVisionClient;
+            }
+        };
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcp-credentials-provider-service-id", 
gcpCredentialsService);
+        runner.enableControllerService(gcpCredentialsService);
+        runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp-credentials-provider-service-id");
+        runner.assertValid(gcpCredentialsService);
+        runner.setProperty(JSON_PAYLOAD, jsonPayloadValue);
+    }
+
+    @Test
+    public void testAnnotateImageJob() throws ExecutionException, 
InterruptedException, IOException {
+        
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
+        when(operationFuture.getName()).thenReturn(operationName);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, 
GCP_OPERATION_KEY);
+    }
+
+    @Test
+    public void testAnnotateFilesJob() throws ExecutionException, 
InterruptedException, IOException {
+        
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
+        when(operationFuture.getName()).thenReturn(operationName);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        runner.assertAllFlowFilesContainAttribute(REL_SUCCESS, 
GCP_OPERATION_KEY);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
new file mode 100644
index 0000000000..ff5c3128ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
@@ -0,0 +1,21 @@
+{
+  "requests": [
+    {
+      "inputConfig": {
+        "gcsSource": {
+          "uri": "gs://qe-dim-external/kjantner-vision-test/TestDoc.pdf"
+        },
+        "mimeType": "application/pdf"
+      },
+      "features": [{
+        "type": "DOCUMENT_TEXT_DETECTION",
+        "maxResults": 4
+      }],
+      "outputConfig": {
+        "gcsDestination": {
+          "uri": "gs://qe-dim-external/kjantner-vision-test/results-files/"
+        },
+        "batchSize": 2
+      }
+    }]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
new file mode 100644
index 0000000000..510d8052e5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
@@ -0,0 +1,19 @@
+{
+  "requests": [{
+    "image": {
+      "source": {
+        "imageUri": "gs://qe-dim-external/kjantner-vision-test/vision-test.png"
+      }
+    },
+    "features": [{
+      "type": "FACE_DETECTION",
+      "maxResults": 4
+    }]
+  }],
+  "outputConfig": {
+    "gcsDestination": {
+      "uri": "gs://qe-dim-external/kjantner-vision-test/results/"
+    },
+    "batchSize": 2
+  }
+}
\ No newline at end of file

Reply via email to