This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 67925b171a NIFI-10953: Implement GCP Vision AI processors
67925b171a is described below
commit 67925b171aa4c629fca7c3771a43779425bfd8b2
Author: Kalman Jantner <[email protected]>
AuthorDate: Tue Dec 6 16:56:29 2022 +0100
NIFI-10953: Implement GCP Vision AI processors
This closes #6762.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 22 ++++
.../gcp/vision/AbstractGcpVisionProcessor.java | 85 +++++++++++++++
...bstractGetGcpVisionAnnotateOperationStatus.java | 114 +++++++++++++++++++
.../vision/AbstractStartGcpVisionOperation.java | 81 ++++++++++++++
.../GetGcpVisionAnnotateFilesOperationStatus.java | 41 +++++++
.../GetGcpVisionAnnotateImagesOperationStatus.java | 41 +++++++
.../StartGcpVisionAnnotateFilesOperation.java | 94 ++++++++++++++++
.../StartGcpVisionAnnotateImagesOperation.java | 94 ++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 6 +-
.../additionalDetails.html | 34 ++++++
.../additionalDetails.html | 34 ++++++
.../additionalDetails.html | 104 ++++++++++++++++++
.../vision-annotate-files.png | Bin 0 -> 297707 bytes
.../additionalDetails.html | 105 ++++++++++++++++++
.../vision-annotate-images.png | Bin 0 -> 291842 bytes
...tGcpVisionAnnotateFilesOperationStatusTest.java | 120 ++++++++++++++++++++
...GcpVisionAnnotateImagesOperationStatusTest.java | 121 +++++++++++++++++++++
.../StartGcpVisionAnnotateFilesOperationTest.java | 106 ++++++++++++++++++
.../StartGcpVisionAnnotateImagesOperationTest.java | 100 +++++++++++++++++
.../src/test/resources/vision/annotate-file.json | 21 ++++
.../src/test/resources/vision/annotate-image.json | 19 ++++
21 files changed, 1341 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index a70a98e046..8c5871ebf8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -187,6 +187,28 @@
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-vision</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
new file mode 100644
index 0000000000..4530ac3f8c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGcpVisionProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.cloud.vision.v1.ImageAnnotatorSettings;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public abstract class AbstractGcpVisionProcessor extends AbstractProcessor {
+ public static final String GCP_OPERATION_KEY = "operationKey";
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("FlowFiles are routed to success
relationship").build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("FlowFiles are routed to failure
relationship").build();
+
+ protected static final Set<Relationship> relationships =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+ protected static final List<PropertyDescriptor> properties =
Collections.unmodifiableList(Arrays.asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE)
+ );
+
+ private ImageAnnotatorClient vision;
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ final GCPCredentialsService gcpCredentialsService =
+
context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
+ try {
+ GoogleCredentials credentials =
gcpCredentialsService.getGoogleCredentials();
+ FixedCredentialsProvider credentialsProvider =
FixedCredentialsProvider.create(credentials);
+ ImageAnnotatorSettings.Builder builder =
ImageAnnotatorSettings.newBuilder().setCredentialsProvider(credentialsProvider);
+ vision = ImageAnnotatorClient.create(builder.build());
+ } catch (Exception e) {
+ getLogger().error("Failed to create vision client.", e);
+ throw new ProcessException("Failed to create vision client.", e);
+ }
+ }
+
+ protected ImageAnnotatorClient getVisionClient() {
+ return this.vision;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
new file mode 100644
index 0000000000..0a8db05d75
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractGetGcpVisionAnnotateOperationStatus.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
+import com.google.longrunning.Operation;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import com.google.rpc.Status;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+abstract public class AbstractGetGcpVisionAnnotateOperationStatus extends
AbstractGcpVisionProcessor {
+ public static final PropertyDescriptor OPERATION_KEY = new
PropertyDescriptor.Builder()
+ .name("operationKey")
+ .displayName("GCP Operation Key")
+ .description("The unique identifier of the Vision operation.")
+ .defaultValue("${operationKey}")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final Relationship REL_RUNNING = new Relationship.Builder()
+ .name("running")
+ .description("The job is currently still being processed")
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Upon successful completion, the original FlowFile
will be routed to this relationship.")
+ .autoTerminateDefault(true)
+ .build();
+ private static final List<PropertyDescriptor> PROPERTIES =
+ Collections.unmodifiableList(Stream.concat(properties.stream(),
Stream.of(OPERATION_KEY)).collect(Collectors.toList()));
+ private static final Set<Relationship> relationships =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL,
+ REL_SUCCESS,
+ REL_FAILURE,
+ REL_RUNNING
+ )));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ try {
+ String operationKey =
context.getProperty(OPERATION_KEY).evaluateAttributeExpressions(flowFile).getValue();;
+ Operation operation =
getVisionClient().getOperationsClient().getOperation(operationKey);
+ getLogger().info(operation.toString());
+ if (operation.getDone() && !operation.hasError()) {
+ GeneratedMessageV3 response =
deserializeResponse(operation.getResponse().getValue());
+ FlowFile childFlowFile = session.create(flowFile);
+ session.write(childFlowFile, out ->
out.write(JsonFormat.printer().print(response).getBytes(StandardCharsets.UTF_8)));
+ session.putAttribute(childFlowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(childFlowFile, REL_SUCCESS);
+ } else if (!operation.getDone()) {
+ session.transfer(flowFile, REL_RUNNING);
+ } else {
+ Status error = operation.getError();
+ getLogger().error("Failed to execute vision operation. Error
code: {}, Error message: {}", error.getCode(), error.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } catch (Exception e) {
+ getLogger().error("Fail to get GCP Vision operation's status", e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ abstract protected GeneratedMessageV3 deserializeResponse(ByteString
responseValue) throws InvalidProtocolBufferException;
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
new file mode 100644
index 0000000000..373d933b7e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/AbstractStartGcpVisionOperation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.protobuf.util.JsonFormat;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public abstract class AbstractStartGcpVisionOperation<B extends
com.google.protobuf.GeneratedMessageV3.Builder<B>> extends
AbstractGcpVisionProcessor {
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null &&
!context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()) {
+ return;
+ } else if (flowFile == null) {
+ flowFile = session.create();
+ }
+ try {
+ OperationFuture<?, ?> asyncResponse = startOperation(session,
context, flowFile);
+ String operationName = asyncResponse.getName();
+ session.putAttribute(flowFile, GCP_OPERATION_KEY, operationName);
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ getLogger().error("Fail to start GCP Vision operation", e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ @OnStopped
+ public void onStopped() throws IOException {
+ getVisionClient().close();
+ }
+
+ protected OperationFuture<?, ?> startOperation(ProcessSession session,
ProcessContext context, FlowFile flowFile) {
+ B builder = newBuilder();
+ InputStream inStream =
context.getProperty(getJsonPayloadPropertyDescriptor()).isSet()
+ ? getInputStreamFromProperty(context, flowFile) :
session.read(flowFile);
+ try (InputStream inputStream = inStream) {
+ JsonFormat.parser().ignoringUnknownFields().merge(new
InputStreamReader(inputStream), builder);
+ } catch (final IOException e) {
+ throw new ProcessException("Read FlowFile Failed", e);
+ }
+ return startOperation(builder);
+ }
+
+ private InputStream getInputStreamFromProperty(ProcessContext context,
FlowFile flowFile) {
+ return new
ByteArrayInputStream(context.getProperty(getJsonPayloadPropertyDescriptor()).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8));
+ }
+
+ abstract B newBuilder();
+
+ abstract OperationFuture<?, ?> startOperation(B builder);
+
+ abstract PropertyDescriptor getJsonPayloadPropertyDescriptor();
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
new file mode 100644
index 0000000000..ab51f012ce
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.cloud.vision.v1p2beta1.AsyncBatchAnnotateFilesResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+
+@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
+@CapabilityDescription("Retrieves the current status of an Google Vision
operation.")
+@SeeAlso({StartGcpVisionAnnotateFilesOperation.class})
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "operationKey", description = "A unique
identifier of the operation designated by the Vision server.")
+})
+public class GetGcpVisionAnnotateFilesOperationStatus extends
AbstractGetGcpVisionAnnotateOperationStatus {
+ @Override
+ protected GeneratedMessageV3 deserializeResponse(ByteString responseValue)
throws InvalidProtocolBufferException {
+ return AsyncBatchAnnotateFilesResponse.parseFrom(responseValue);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
new file mode 100644
index 0000000000..75d657b05f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+
+@Tags({"Google", "Cloud", "Vision", "Machine Learning"})
+@CapabilityDescription("Retrieves the current status of an Google Vision
operation.")
+@SeeAlso({StartGcpVisionAnnotateImagesOperation.class})
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "operationKey", description = "A unique
identifier of the operation designated by the Vision server.")
+})
+public class GetGcpVisionAnnotateImagesOperationStatus extends
AbstractGetGcpVisionAnnotateOperationStatus {
+ @Override
+ protected GeneratedMessageV3 deserializeResponse(ByteString responseValue)
throws InvalidProtocolBufferException {
+ return AsyncBatchAnnotateImagesResponse.parseFrom(responseValue);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
new file mode 100644
index 0000000000..5c7014f722
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperation.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
+@CapabilityDescription("Trigger a Vision operation on file input. It should be
followed by GetGcpVisionAnnotateFilesOperationStatus processor in order to
monitor operation status.")
+@SeeAlso({GetGcpVisionAnnotateFilesOperationStatus.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "operationKey", description = "A unique
identifier of the operation returned by the Vision server.")
+})
+public class StartGcpVisionAnnotateFilesOperation extends
AbstractStartGcpVisionOperation<AsyncBatchAnnotateFilesRequest.Builder> {
+ public static final PropertyDescriptor JSON_PAYLOAD = new
PropertyDescriptor.Builder()
+ .name("json-payload")
+ .displayName("JSON Payload")
+ .description("JSON request for AWS Machine Learning services. The
Processor will use FlowFile content for the request when this property is not
specified.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .defaultValue("{\n" +
+ " \"requests\": [\n" +
+ " {\n" +
+ " \"inputConfig\": {\n" +
+ " \"gcsSource\": {\n" +
+ " \"uri\":
\"gs://${gcs.bucket}/${filename}\"\n" +
+ " },\n" +
+ " \"mimeType\": \"application/pdf\"\n" +
+ " },\n" +
+ " \"features\": [{\n" +
+ " \"type\":
\"DOCUMENT_TEXT_DETECTION\",\n" +
+ " \"maxResults\": 4\n" +
+ " }],\n" +
+ " \"outputConfig\": {\n" +
+ " \"gcsDestination\": {\n" +
+ " \"uri\":
\"gs://${gcs.bucket}/${filename}/\"\n" +
+ " },\n" +
+ " \"batchSize\": 2\n" +
+ " }\n" +
+ " }]\n" +
+ "}")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ AsyncBatchAnnotateFilesRequest.Builder newBuilder() {
+ return AsyncBatchAnnotateFilesRequest.newBuilder();
+ }
+
+ @Override
+ OperationFuture<?, ?>
startOperation(AsyncBatchAnnotateFilesRequest.Builder builder) {
+ return getVisionClient().asyncBatchAnnotateFilesAsync(builder.build());
+ }
+
+ @Override
+ PropertyDescriptor getJsonPayloadPropertyDescriptor() {
+ return JSON_PAYLOAD;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
new file mode 100644
index 0000000000..6d078ec69b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperation.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesRequest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Google", "Cloud", "Machine Learning", "Vision"})
+@CapabilityDescription("Trigger a Vision operation on image input. It should
be followed by GetGcpVisionAnnotateImagesOperationStatus processor in order to
monitor operation status.")
+@SeeAlso({GetGcpVisionAnnotateImagesOperationStatus.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "operationKey", description = "A unique
identifier of the operation returned by the Vision server.")
+})
+public class StartGcpVisionAnnotateImagesOperation extends
AbstractStartGcpVisionOperation<AsyncBatchAnnotateImagesRequest.Builder> {
+ static final PropertyDescriptor JSON_PAYLOAD = new
PropertyDescriptor.Builder()
+ .name("json-payload")
+ .displayName("JSON Payload")
+ .description("JSON request for AWS Machine Learning services. The
Processor will use FlowFile content for the request when this property is not
specified.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("{\n" +
+ " \"requests\": [{\n" +
+ " \"image\": {\n" +
+ " \"source\": {\n" +
+ " \"imageUri\":
\"gs://${gcs.bucket}/${filename}\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"features\": [{\n" +
+ " \"type\": \"FACE_DETECTION\",\n" +
+ " \"maxResults\": 4\n" +
+ " }]\n" +
+ " }],\n" +
+ " \"outputConfig\": {\n" +
+ " \"gcsDestination\": {\n" +
+ " \"uri\":
\"gs://${gcs.bucket}/${filename}/\"\n" +
+ " },\n" +
+ " \"batchSize\": 2\n" +
+ " }\n" +
+ "}")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ JSON_PAYLOAD, GCP_CREDENTIALS_PROVIDER_SERVICE));
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ AsyncBatchAnnotateImagesRequest.Builder newBuilder() {
+ return AsyncBatchAnnotateImagesRequest.newBuilder();
+ }
+
+ @Override
+ OperationFuture<?, ?>
startOperation(AsyncBatchAnnotateImagesRequest.Builder builder) {
+ return
getVisionClient().asyncBatchAnnotateImagesAsync(builder.build());
+ }
+
+ @Override
+ PropertyDescriptor getJsonPayloadPropertyDescriptor() {
+ return JSON_PAYLOAD;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e0edce5fd3..99a4df40a3 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -24,4 +24,8 @@ org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
-org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
\ No newline at end of file
+org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
+org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation
+org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation
+org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus
+org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
new file mode 100644
index 0000000000..f3bd5c178c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateFilesOperationStatus/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Google Vision</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Get Annotate Files Status</h1>
+
+<h3>Usage</h3>
+<p>
+ GetGcpVisionAnnotateFilesOperationStatus is designed to periodically check
the statuses of file annotation operations. This processor should be used in
pair with StartGcpVisionAnnotateFilesOperation Processor.
+ An outgoing FlowFile contains the raw response returned by the Vision
server. This response is in JSON json format and contains a google storage
reference where the result is located, as well as additional metadata, as
written in the <a
href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation"
target="_blank">Google Vision API Reference document</a>.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
new file mode 100644
index 0000000000..1f2654b009
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Google Vision</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Get Annotate Images Status</h1>
+
+<h3>Usage</h3>
+<p>
+ GetGcpVisionAnnotateImagesOperationStatus is designed to periodically
check the statuses of image annotation operations. This processor should be
used in pair with StartGcpVisionAnnotateImagesOperation Processor.
+ An outgoing FlowFile contains the raw response returned by the Vision
server. This response is in JSON json format and contains a google storage
reference where the result is located, as well as additional metadata, as
written in the <a
href="https://cloud.google.com/vision/docs/reference/rest/v1/locations.operations#Operation"
target="_blank">Google Vision API Reference document</a>.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
new file mode 100644
index 0000000000..fe99aefff2
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/additionalDetails.html
@@ -0,0 +1,104 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Google Vision</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Start Annotate Files Operation</h1>
+<p>
+ Prerequisites
+<ul>
+ <li>Make sure Vision API is enabled and the account you are using has the
right to use it</li>
+ <li>Make sure the input file(s) are available in a GCS bucket</li>
+</ul>
+</p>
+<h3>Usage</h3>
+<p>
+ StartGcpVisionAnnotateFilesOperation is designed to trigger file
annotation operations. This processor should be used in pair with the
GetGcpVisionAnnotateFilesOperationStatus Processor.
+ Outgoing FlowFiles contain the raw response to the request returned by the
Vision server. The response is in JSON format and contains the result and
additional metadata as written in the Google Vision API Reference documents.
+</p>
+
+<h3>Payload</h3>
+<p>
+ The JSON Payload is a request in JSON format as documented in the <a
href="https://cloud.google.com/vision/docs/reference/rest/v1/files/asyncBatchAnnotate"
target="_blank">Google Vision REST API reference document</a>.
+ Payload can be fed to the processor via the <code>JSON Payload</code>
property or as a FlowFile content. The property has higher precedence over
FlowFile content.
+ Please make sure to delete the default value of the property if you want
to use FlowFile content payload.
+ A JSON payload template example:
+</p>
+<code>
+ <pre>
+{
+ "requests": [
+ {
+ "inputConfig": {
+ "gcsSource": {
+ "uri": "gs://${gcs.bucket}/${filename}"
+ },
+ "mimeType": "application/pdf"
+ },
+ "features": [{
+ "type": "DOCUMENT_TEXT_DETECTION",
+ "maxResults": 4
+ }],
+ "outputConfig": {
+ "gcsDestination": {
+ "uri": "gs://${gcs.bucket}/${filename}/"
+ },
+ "batchSize": 2
+ }
+ }]
+}
+ </pre>
+</code>
+<h3>Features types</h3>
+<ul>
+ <li>TEXT_DETECTION: Optical character recognition (OCR) for an image; text
recognition and conversion to machine-coded text. Identifies and extracts UTF-8
text in an image.</li>
+ <li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a
file (PDF/TIFF) or dense text image; dense text recognition and conversion to
machine-coded text.</li>
+</ul>
+You can find more details at <a
href="https://cloud.google.com/vision/docs/features-list"
target="_blank">Google Vision Feature List</a>
+
+<h3>Example: How to setup a simple Annotate Image Flow</h3>
+<p>
+ Prerequisites
+</p>
+<p>
+<ul>
+ <li>Input files should be available in a GCS bucket</li>
+ <li>This bucket must not contain anything else but the input files</li>
+</ul>
+</p>
+<p>Create the following flow</p>
+<img src="vision-annotate-files.png" style="height: 50%; width: 50%"/>
+<p>
+ Keep the default value of JSON PAYLOAD property in
StartGcpVisionAnnotateImagesOperation
+</p>
+<p>
+ Execution steps:
+<ul>
+ <li>ListGCSBucket processor will return a list of files in the bucket at
the first run.</li>
+ <li>ListGCSBucket will return only new items at subsequent runs.</li>
+ <li>StartGcpVisionAnnotateFilesOperation processor will trigger GCP Vision
file annotation jobs based on the JSON payload.</li>
+ <li>StartGcpVisionAnnotateFilesOperation processor will populate the
<code>operationKey</code> flow file attribute.</li>
+ <li>GetGcpVisionAnnotateFilesOperationStatus processor will periodically
query status of the job.</li>
+</ul>
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
new file mode 100644
index 0000000000..65635d2bfc
Binary files /dev/null and
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation/vision-annotate-files.png
differ
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
new file mode 100644
index 0000000000..e15796fcb0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/additionalDetails.html
@@ -0,0 +1,105 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>Google Vision</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+</head>
+<body>
+
+<h1>Google Cloud Vision - Start Annotate Images Operation</h1>
+<p>
+ Prerequisites
+<ul>
+ <li>Make sure Vision API is enabled and the account you are using has
the right to use it</li>
+ <li>Make sure thne input image(s) are available in a GCS bucket</li>
+</ul>
+</p>
+<h3>Usage</h3>
+<p>
+ StartGcpVisionAnnotateImagesOperation is designed to trigger image
annotation operations. This processor should be used in pair with the
GetGcpVisionAnnotateImagesOperationStatus Processor.
+ Outgoing FlowFiles contain the raw response to the request returned by
the Vision server. The response is in JSON format and contains the result and
additional metadata as written in the Google Vision API Reference documents.
+</p>
+<h3>Payload</h3>
+<p>
+ The JSON Payload is a request in JSON format as documented in the <a
href="https://cloud.google.com/vision/docs/reference/rest/v1/images/asyncBatchAnnotate"
target="_blank">Google Vision REST API reference document</a>.
+ Payload can be fed to the processor via the <code>JSON Payload</code>
property or as a FlowFile content. The property has higher precedence over
FlowFile content.
+ Please make sure to delete the default value of the property if you
want to use FlowFile content payload.
+ A JSON payload template example:
+</p>
+
+<code>
+ <pre>
+{
+ "requests": [{
+ "image": {
+ "source": {
+ "imageUri": "gs://${gcs.bucket}/${filename}"
+ }
+ },
+ "features": [{
+ "type": "DOCUMENT_TEXT_DETECTION",
+ "maxResults": 4
+ }]
+ }],
+ "outputConfig": {
+ "gcsDestination": {
+ "uri": "gs://${gcs.bucket}/${filename}/"
+ },
+ "batchSize": 2
+ }
+}
+ </pre>
+</code>
+<h3>Features types</h3>
+<ul>
+ <li>TEXT_DETECTION: Optical character recognition (OCR) for an image;
text recognition and conversion to machine-coded text. Identifies and extracts
UTF-8 text in an image.</li>
+ <li>DOCUMENT_TEXT_DETECTION: Optical character recognition (OCR) for a
file (PDF/TIFF) or dense text image; dense text recognition and conversion to
machine-coded text.</li>
+ <li>LANDMARK_DETECTION: Provides the name of the landmark, a confidence
score and a bounding box in the image for the landmark.</li>
+ <li>LOGO_DETECTION: Provides a textual description of the entity
identified, a confidence score, and a bounding polygon for the logo in the
file.</li>
+ <li>LABEL_DETECTION: Provides generalized labels for an image.</li>
+ <li>etc.</li>
+</ul>
+You can find more details at <a
href="https://cloud.google.com/vision/docs/features-list"
target="_blank">Google Vision Feature List</a>
+<h3>Example: How to setup a simple Annotate Image Flow</h3>
+<p>
+ Prerequisites
+</p>
+<p>
+<ul>
+ <li>Input image files should be available in a GCS bucket</li>
+ <li>This bucket must not contain anything else but the input image
files</li>
+</ul>
+</p>
+<p>Create the following flow</p>
+<img src="vision-annotate-images.png" style="height: 50%; width: 50%"/>
+<p>
+ Keep the default value of JSON PAYLOAD property in
StartGcpVisionAnnotateImagesOperation
+</p>
+<p>
+ Execution steps:
+<ul>
+ <li>ListGCSBucket processor will return a list of files in the bucket
at the first run.</li>
+ <li>ListGCSBucket will return only new items at subsequent runs.</li>
+ <li>StartGcpVisionAnnotateImagesOperation processor will trigger GCP
Vision image annotation jobs based on the JSON payload.</li>
+ <li>StartGcpVisionAnnotateImagesOperation processor will populate the
<code>operationKey</code> flow file attribute.</li>
+ <li>GetGcpVisionAnnotateImagesOperationStatus processor will
periodically query status of the job.</li>
+</ul>
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
new file mode 100644
index 0000000000..e4ed82c406
Binary files /dev/null and
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation/vision-annotate-images.png
differ
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
new file mode 100644
index 0000000000..b64d441d1c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateFilesOperationStatusTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.longrunning.Operation;
+import com.google.longrunning.OperationsClient;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.rpc.Status;
+import java.util.Collections;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetGcpVisionAnnotateFilesOperationStatusTest {
+ private static final String PLACEHOLDER_CONTENT = "content";
+ private static final String OPERATION_KEY = "operationKey";
+ private TestRunner runner = null;
+ private GetGcpVisionAnnotateFilesOperationStatus processor;
+ @Mock
+ private ImageAnnotatorClient mockVisionClient;
+ private GCPCredentialsService gcpCredentialsService;
+ @Mock
+ private OperationsClient operationClient;
+ @Mock
+ private Operation operation;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ gcpCredentialsService = new GCPCredentialsControllerService();
+ processor = new GetGcpVisionAnnotateFilesOperationStatus() {
+ @Override
+ protected ImageAnnotatorClient getVisionClient() {
+ return mockVisionClient;
+ }
+
+ @Override
+ protected GeneratedMessageV3 deserializeResponse(ByteString
responseValue) {
+ return AsyncBatchAnnotateFilesResponse.newBuilder().build();
+ }
+ };
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("gcp-credentials-provider-service-id",
gcpCredentialsService);
+ runner.enableControllerService(gcpCredentialsService);
+ runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcp-credentials-provider-service-id");
+ runner.assertValid(gcpCredentialsService);
+ }
+
+ @Test
+ public void testGetAnnotateFilesJobStatusSuccess() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getResponse()).thenReturn(Any.newBuilder().build());
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(false);
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertTransferCount(REL_SUCCESS, 1);
+ runner.assertTransferCount(REL_ORIGINAL, 1);
+ }
+
+ @Test
+ public void testGetAnnotateFilesJobStatusInProgress() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(true);
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testGetAnnotateImagesJobStatusFailed() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(true);
+ when(operation.getError()).thenReturn(Status.newBuilder().build());
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
new file mode 100644
index 0000000000..d4576ca1d4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/GetGcpVisionAnnotateImagesOperationStatusTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGetGcpVisionAnnotateOperationStatus.REL_ORIGINAL;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.vision.v1.AsyncBatchAnnotateImagesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.longrunning.Operation;
+import com.google.longrunning.OperationsClient;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Status;
+import java.util.Collections;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class GetGcpVisionAnnotateImagesOperationStatusTest {
+ private static final String PLACEHOLDER_CONTENT = "content";
+ private static final String OPERATION_KEY = "operationKey";
+ private TestRunner runner = null;
+ private GetGcpVisionAnnotateImagesOperationStatus processor;
+ @Mock
+ private ImageAnnotatorClient mockVisionClient;
+ private GCPCredentialsService gcpCredentialsService;
+ @Mock
+ private OperationsClient operationClient;
+ @Mock
+ private Operation operation;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ gcpCredentialsService = new GCPCredentialsControllerService();
+ processor = new GetGcpVisionAnnotateImagesOperationStatus() {
+ @Override
+ protected ImageAnnotatorClient getVisionClient() {
+ return mockVisionClient;
+ }
+
+ @Override
+ protected GeneratedMessageV3 deserializeResponse(ByteString
responseValue) throws InvalidProtocolBufferException {
+ return AsyncBatchAnnotateImagesResponse.newBuilder().build();
+ }
+ };
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("gcp-credentials-provider-service-id",
gcpCredentialsService);
+ runner.enableControllerService(gcpCredentialsService);
+ runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcp-credentials-provider-service-id");
+ runner.assertValid(gcpCredentialsService);
+ }
+
+ @Test
+ public void testGetAnnotateImagesJobStatusSuccess() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getResponse()).thenReturn(Any.newBuilder().build());
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(false);
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertTransferCount(REL_SUCCESS, 1);
+ runner.assertTransferCount(REL_ORIGINAL, 1);
+ }
+
+ @Test
+ public void testGetAnnotateImagesJobStatusInProgress() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(true);
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testGetAnnotateImagesJobStatusFailed() {
+
when(mockVisionClient.getOperationsClient()).thenReturn(operationClient);
+
when(operationClient.getOperation(OPERATION_KEY)).thenReturn(operation);
+ when(operation.getDone()).thenReturn(true);
+ when(operation.hasError()).thenReturn(true);
+ when(operation.getError()).thenReturn(Status.newBuilder().build());
+ runner.enqueue(PLACEHOLDER_CONTENT,
Collections.singletonMap(GCP_OPERATION_KEY, OPERATION_KEY));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_FAILURE, 1);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
new file mode 100644
index 0000000000..4f5ccb9a56
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateFilesOperationTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_FAILURE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesRequest;
+import com.google.cloud.vision.v1.AsyncBatchAnnotateFilesResponse;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import com.google.cloud.vision.v1.OperationMetadata;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class StartGcpVisionAnnotateFilesOperationTest {
+ private TestRunner runner = null;
+ private StartGcpVisionAnnotateFilesOperation processor;
+ private static final Path FlowFileContent =
Paths.get("src/test/resources/vision/annotate-image.json");
+ @Mock
+ private ImageAnnotatorClient vision;
+ @Captor
+ private ArgumentCaptor<AsyncBatchAnnotateFilesRequest> requestCaptor;
+ private String operationName = "operationName";
+ @Mock
+ private OperationFuture<AsyncBatchAnnotateFilesResponse,
OperationMetadata> operationFuture;
+ @Mock
+ private ApiFuture<OperationSnapshot> apiFuture;
+ @Mock
+ private ImageAnnotatorClient mockVisionClient;
+ private GCPCredentialsService gcpCredentialsService;
+ @Mock
+ private OperationSnapshot operationSnapshot;
+
+ @BeforeEach
+ public void setUp() throws InitializationException {
+ gcpCredentialsService = new GCPCredentialsControllerService();
+ processor = new StartGcpVisionAnnotateFilesOperation() {
+ @Override
+ protected ImageAnnotatorClient getVisionClient() {
+ return mockVisionClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("gcp-credentials-provider-service-id",
gcpCredentialsService);
+ runner.enableControllerService(gcpCredentialsService);
+ runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcp-credentials-provider-service-id");
+ runner.assertValid(gcpCredentialsService);
+ }
+
+ @Test
+ public void testAnnotateFilesJob() throws ExecutionException,
InterruptedException, IOException {
+
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest)
any())).thenReturn(operationFuture);
+ when(operationFuture.getName()).thenReturn(operationName);
+ runner.enqueue(FlowFileContent, Collections.emptyMap());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ runner.assertAllFlowFilesContainAttribute(REL_SUCCESS,
GCP_OPERATION_KEY);
+ }
+
+ @Test
+ public void testAnnotateFilesJobFail() throws IOException {
+
when(mockVisionClient.asyncBatchAnnotateFilesAsync((AsyncBatchAnnotateFilesRequest)any())).thenThrow(new
RuntimeException("ServiceError"));
+ runner.enqueue(FlowFileContent, Collections.emptyMap());
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_FAILURE);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
new file mode 100644
index 0000000000..9ce1eac470
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/vision/StartGcpVisionAnnotateImagesOperationTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.vision;
+
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.GCP_OPERATION_KEY;
+import static
org.apache.nifi.processors.gcp.vision.AbstractGcpVisionProcessor.REL_SUCCESS;
+import static
org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation.JSON_PAYLOAD;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.cloud.vision.v1.ImageAnnotatorClient;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class StartGcpVisionAnnotateImagesOperationTest {
+ private TestRunner runner = null;
+ private StartGcpVisionAnnotateImagesOperation processor;
+ private String operationName = "operationName";
+ @Mock
+ private OperationFuture operationFuture;
+ @Mock
+ private ApiFuture<OperationSnapshot> apiFuture;
+ @Mock
+ private ImageAnnotatorClient mockVisionClient;
+ private GCPCredentialsService gcpCredentialsService;
+ @Mock
+ private OperationSnapshot operationSnapshot;
+ private String jsonPayloadValue;
+
+ @BeforeEach
+ public void setUp() throws InitializationException, IOException {
+ jsonPayloadValue = FileUtils.readFileToString(new
File("src/test/resources/vision/annotate-image.json"), "UTF-8");
+ gcpCredentialsService = new GCPCredentialsControllerService();
+ processor = new StartGcpVisionAnnotateImagesOperation() {
+ @Override
+ protected ImageAnnotatorClient getVisionClient() {
+ return mockVisionClient;
+ }
+ };
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("gcp-credentials-provider-service-id",
gcpCredentialsService);
+ runner.enableControllerService(gcpCredentialsService);
+ runner.setProperty(GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcp-credentials-provider-service-id");
+ runner.assertValid(gcpCredentialsService);
+ runner.setProperty(JSON_PAYLOAD, jsonPayloadValue);
+ }
+
+ @Test
+ public void testAnnotateImageJob() throws ExecutionException,
InterruptedException, IOException {
+
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
+ when(operationFuture.getName()).thenReturn(operationName);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ runner.assertAllFlowFilesContainAttribute(REL_SUCCESS,
GCP_OPERATION_KEY);
+ }
+
+ @Test
+ public void testAnnotateFilesJob() throws ExecutionException,
InterruptedException, IOException {
+
when(mockVisionClient.asyncBatchAnnotateImagesAsync(any())).thenReturn(operationFuture);
+ when(operationFuture.getName()).thenReturn(operationName);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ runner.assertAllFlowFilesContainAttribute(REL_SUCCESS,
GCP_OPERATION_KEY);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
new file mode 100644
index 0000000000..ff5c3128ac
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-file.json
@@ -0,0 +1,21 @@
+{
+ "requests": [
+ {
+ "inputConfig": {
+ "gcsSource": {
+ "uri": "gs://qe-dim-external/kjantner-vision-test/TestDoc.pdf"
+ },
+ "mimeType": "application/pdf"
+ },
+ "features": [{
+ "type": "DOCUMENT_TEXT_DETECTION",
+ "maxResults": 4
+ }],
+ "outputConfig": {
+ "gcsDestination": {
+ "uri": "gs://qe-dim-external/kjantner-vision-test/results-files/"
+ },
+ "batchSize": 2
+ }
+ }]
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
new file mode 100644
index 0000000000..510d8052e5
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/vision/annotate-image.json
@@ -0,0 +1,19 @@
+{
+ "requests": [{
+ "image": {
+ "source": {
+ "imageUri": "gs://qe-dim-external/kjantner-vision-test/vision-test.png"
+ }
+ },
+ "features": [{
+ "type": "FACE_DETECTION",
+ "maxResults": 4
+ }]
+ }],
+ "outputConfig": {
+ "gcsDestination": {
+ "uri": "gs://qe-dim-external/kjantner-vision-test/results/"
+ },
+ "batchSize": 2
+ }
+}
\ No newline at end of file