This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 99610f09a4 NIFI-11553 Add Record handling and more Properties for GCP
PubSub
99610f09a4 is described below
commit 99610f09a40513b29574c175b4fbcf6bba4d7294
Author: Paul Grey <[email protected]>
AuthorDate: Tue May 16 09:38:39 2023 -0400
NIFI-11553 Add Record handling and more Properties for GCP PubSub
- Added Message Derivation Strategy to PublishGCPubSub with FlowFile and
Record options
- Added API Endpoint property to PublishGCPPubSub and ConsumeGCPubSub
- Added Batch configuration properties
This closes #7274
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 336b857442109e1f2c8b3b6108106514609159f5)
---
.../gcp/pubsub/AbstractGCPubSubProcessor.java | 39 ++-
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 13 +-
.../processors/gcp/pubsub/PubSubAttributes.java | 3 +
.../processors/gcp/pubsub/PublishGCPubSub.java | 320 ++++++++++++++++-----
.../gcp/pubsub/lite/PublishGCPubSubLite.java | 24 +-
.../gcp/pubsub/publish/FlowFileResult.java | 143 +++++++++
.../pubsub/publish/MessageDerivationStrategy.java | 53 ++++
.../pubsub/publish/TrackedApiFutureCallback.java | 45 +++
.../processors/gcp/pubsub/ConsumeGCPubSubIT.java | 6 +-
.../processors/gcp/pubsub/PublishGCPubSubIT.java | 4 +-
.../processors/gcp/pubsub/PublishGCPubSubTest.java | 233 +++++++++++++++
.../gcp/pubsub/lite/PublishGCPubSubLiteTest.java | 65 +++++
.../src/test/resources/pubsub/records.json | 17 ++
13 files changed, 859 insertions(+), 106 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index fc135dba81..6602a0e0c8 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -19,9 +19,11 @@ package org.apache.nifi.processors.gcp.pubsub;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
@@ -36,9 +38,9 @@ import java.util.Set;
public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor
implements VerifiableProcessor {
- public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new
PropertyDescriptor.Builder()
.name("gcp-pubsub-publish-batch-size")
- .displayName("Batch Size")
+ .displayName("Batch Size Threshold")
.description("Indicates the number of messages the cloud service
should bundle together in a batch. If not set and left empty, only one message
" +
"will be used in a batch")
.required(true)
@@ -46,6 +48,39 @@ public abstract class AbstractGCPubSubProcessor extends
AbstractGCPProcessor imp
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("gcp-batch-bytes")
+ .displayName("Batch Bytes Threshold")
+ .description("Publish request gets triggered based on this Batch
Bytes Threshold property and"
+ + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + "
property, whichever condition is met first.")
+ .required(true)
+ .defaultValue("3 MB")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("gcp-pubsub-publish-batch-delay")
+ .displayName("Batch Delay Threshold")
+ .description("Indicates the delay threshold to use for batching.
After this amount of time has elapsed " +
+ "(counting from the first element added), the elements
will be wrapped up in a batch and sent. " +
+ "This value should not be set too high, usually on the
order of milliseconds. Otherwise, calls " +
+ "might appear to never complete.")
+ .required(true)
+ .defaultValue("100 millis")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor API_ENDPOINT = new
PropertyDescriptor
+ .Builder().name("api-endpoint")
+ .displayName("API Endpoint")
+ .description("Override the gRPC endpoint in the form of
[host:port]")
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .defaultValue(PublisherStubSettings.getDefaultEndpoint()) //
identical to SubscriberStubSettings.getDefaultEndpoint()
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to this relationship after a
successful Google Cloud Pub/Sub operation.")
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 3928a2ae63..77d9fcac87 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -102,7 +102,7 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) {
- final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final Integer batchSize =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
@@ -192,7 +192,8 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new
ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(SUBSCRIPTION);
- descriptors.add(BATCH_SIZE);
+ descriptors.add(BATCH_SIZE_THRESHOLD);
+ descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors);
}
@@ -268,11 +269,13 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
}
private SubscriberStub getSubscriber(final ProcessContext context) throws
IOException {
- final SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
+ final String endpoint = context.getProperty(API_ENDPOINT).getValue();
+
+ final SubscriberStubSettings.Builder subscriberBuilder =
SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setTransportChannelProvider(getTransportChannelProvider(context))
- .build();
+ .setEndpoint(endpoint);
- return GrpcSubscriberStub.create(subscriberStubSettings);
+ return GrpcSubscriberStub.create(subscriberBuilder.build());
}
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
index cfcdda5649..b9654112d4 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -21,6 +21,9 @@ public class PubSubAttributes {
public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub
message published to the configured Google Cloud PubSub topic";
+ public static final String RECORDS_ATTRIBUTE = "gcp.pubsub.count.records";
+ public static final String RECORDS_DESCRIPTION = "Count of pubsub messages
published to the configured Google Cloud PubSub topic";
+
public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google
Cloud PubSub topic the message was published to";
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 86545ab5fd..f2d5ed7ebd 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -17,13 +17,15 @@
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.ByteString;
@@ -48,11 +50,27 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.threeten.bp.Duration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -64,12 +82,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
@@ -82,12 +101,63 @@ import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
description = "Attributes to be set for the outgoing Google Cloud
PubSub message", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes({
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
+ @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description =
RECORDS_DESCRIPTION),
@WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description =
TOPIC_NAME_DESCRIPTION)
})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.topics.publish");
+ private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+ public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Input Batch Size")
+ .displayName("Input Batch Size")
+ .description("Maximum number of FlowFiles processed for each
Processor invocation")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NUMBER_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Message Derivation Strategy")
+ .displayName("Message Derivation Strategy")
+ .description("The strategy used to publish the incoming FlowFile
to the Google Cloud PubSub endpoint.")
+ .required(true)
+
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+ .allowableValues(MessageDerivationStrategy.class)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use for incoming FlowFiles")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .displayName("Record Writer")
+ .description("The Record Writer to use in order to serialize the
data before sending to GCPubSub endpoint")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor MAX_MESSAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Message Size")
+ .displayName("Maximum Message Size")
+ .description("The maximum size of a Google PubSub message in
bytes. Defaults to 1 MB (1048576 bytes)")
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
public static final PropertyDescriptor TOPIC_NAME = new
PropertyDescriptor.Builder()
.name("gcp-pubsub-topic")
@@ -103,14 +173,21 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
.description("FlowFiles are routed to this relationship if the
Google Cloud Pub/Sub operation fails but attempting the operation again may
succeed.")
.build();
- private Publisher publisher = null;
- private final AtomicReference<Exception> storedException = new
AtomicReference<>();
+ protected Publisher publisher = null;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ descriptors.add(MAX_BATCH_SIZE);
+ descriptors.add(MESSAGE_DERIVATION_STRATEGY);
+ descriptors.add(RECORD_READER);
+ descriptors.add(RECORD_WRITER);
+ descriptors.add(MAX_MESSAGE_SIZE);
descriptors.add(TOPIC_NAME);
- descriptors.add(BATCH_SIZE);
+ descriptors.add(BATCH_SIZE_THRESHOLD);
+ descriptors.add(BATCH_BYTES_THRESHOLD);
+ descriptors.add(BATCH_DELAY_THRESHOLD);
+ descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors);
}
@@ -139,8 +216,7 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
try {
publisher = getPublisherBuilder(context).build();
} catch (IOException e) {
- getLogger().error("Failed to create Google Cloud PubSub Publisher
due to {}", new Object[]{e});
- storedException.set(e);
+ throw new ProcessException("Failed to create Google Cloud PubSub
Publisher", e);
}
}
@@ -171,25 +247,26 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
.setTransportChannelProvider(getTransportChannelProvider(context))
.build();
- final GrpcPublisherStub publisherStub =
GrpcPublisherStub.create(publisherStubSettings);
- final String topicName =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
- final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder()
- .addAllPermissions(REQUIRED_PERMISSIONS)
- .setResource(topicName)
- .build();
- final TestIamPermissionsResponse response =
publisherStub.testIamPermissionsCallable().call(request);
- if (response.getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
-
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
- .explanation(String.format("Verified Topic [%s]
exists and the configured user has the correct permissions.", topicName))
- .build());
- } else {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
- .outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The configured user
does not have the correct permissions on Topic [%s].", topicName))
- .build());
+ try (GrpcPublisherStub publisherStub =
GrpcPublisherStub.create(publisherStubSettings)) {
+ final String topicName =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+ final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder()
+ .addAllPermissions(REQUIRED_PERMISSIONS)
+ .setResource(topicName)
+ .build();
+ final TestIamPermissionsResponse response =
publisherStub.testIamPermissionsCallable().call(request);
+ if (response.getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Topic
[%s] exists and the configured user has the correct permissions.", topicName))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+
.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured
user does not have the correct permissions on Topic [%s].", topicName))
+ .build());
+ }
}
} catch (final ApiException e) {
verificationLogger.error("The configured user appears to have
the correct permissions, but the following error was encountered", e);
@@ -213,62 +290,143 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
- final List<FlowFile> flowFiles = session.get(flowFileCount);
-
- if (flowFiles.isEmpty() || publisher == null) {
- if (storedException.get() != null) {
- getLogger().error("Google Cloud PubSub Publisher was not
properly created due to {}", new Object[]{storedException.get()});
- }
+ final StopWatch stopWatch = new StopWatch(true);
+ final MessageDerivationStrategy inputStrategy =
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
+ final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).asInteger();
+ final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+ if (flowFileBatch.isEmpty()) {
context.yield();
- return;
+ } else if
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+ onTriggerFlowFileStrategy(context, session, stopWatch,
flowFileBatch);
+ } else if
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
+ onTriggerRecordStrategy(context, session, stopWatch,
flowFileBatch);
+ } else {
+ throw new IllegalStateException(inputStrategy.getValue());
}
+ }
- final long startNanos = System.nanoTime();
- final List<FlowFile> successfulFlowFiles = new ArrayList<>();
- final String topicName = getTopicName(context).toString();
+ private void onTriggerFlowFileStrategy(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch) throws ProcessException {
+ final long maxMessageSize =
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
- try {
- for (FlowFile flowFile : flowFiles) {
- try {
- final ByteArrayOutputStream baos = new
ByteArrayOutputStream();
- session.exportTo(flowFile, baos);
- final ByteString flowFileContent =
ByteString.copyFrom(baos.toByteArray());
-
- PubsubMessage message =
PubsubMessage.newBuilder().setData(flowFileContent)
- .setPublishTime(Timestamp.newBuilder().build())
- .putAllAttributes(getDynamicAttributesMap(context,
flowFile))
- .build();
+ final Executor executor = MoreExecutors.directExecutor();
+ final List<FlowFileResult> flowFileResults = new ArrayList<>();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ApiFuture<String> messageIdFuture =
publisher.publish(message);
+ for (final FlowFile flowFile : flowFileBatch) {
+ final List<ApiFuture<String>> futures = new ArrayList<>();
+ final List<String> successes = new ArrayList<>();
+ final List<Throwable> failures = new ArrayList<>();
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(MESSAGE_ID_ATTRIBUTE,
messageIdFuture.get());
- attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+ if (flowFile.getSize() > maxMessageSize) {
+ final String message = String.format("FlowFile size %d exceeds
MAX_MESSAGE_SIZE", flowFile.getSize());
+ failures.add(new IllegalArgumentException(message));
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
+ } else {
+ baos.reset();
+ session.exportTo(flowFile, baos);
- flowFile = session.putAllAttributes(flowFile, attributes);
- successfulFlowFiles.add(flowFile);
- } catch (InterruptedException | ExecutionException e) {
- if (e.getCause() instanceof DeadlineExceededException) {
- getLogger().error("Failed to publish the message to
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
- "so routing to retry", new
Object[]{topicName, e.getLocalizedMessage()}, e);
- session.transfer(flowFile, REL_RETRY);
- } else {
- getLogger().error("Failed to publish the message to
Google Cloud PubSub topic '{}'", topicName, e);
- session.transfer(flowFile, REL_FAILURE);
- }
- context.yield();
- }
+ final ApiFuture<String> apiFuture = publishOneMessage(context,
flowFile, baos.toByteArray());
+ futures.add(apiFuture);
+ addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
}
- } finally {
- if (!successfulFlowFiles.isEmpty()) {
- session.transfer(successfulFlowFiles, REL_SUCCESS);
- for (FlowFile flowFile : successfulFlowFiles) {
- final long transmissionMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile, topicName,
transmissionMillis);
- }
+ }
+ finishBatch(session, stopWatch, flowFileResults);
+ }
+
+ private void onTriggerRecordStrategy(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch) throws ProcessException {
+ try {
+ onTriggerRecordStrategyPublishRecords(context, session, stopWatch,
flowFileBatch);
+ } catch (IOException | SchemaNotFoundException |
MalformedRecordException e) {
+ throw new ProcessException("Record publishing failed", e);
+ }
+ }
+
+ private void onTriggerRecordStrategyPublishRecords(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch)
+ throws ProcessException, IOException, SchemaNotFoundException,
MalformedRecordException {
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final Executor executor = MoreExecutors.directExecutor();
+ final List<FlowFileResult> flowFileResults = new ArrayList<>();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ for (final FlowFile flowFile : flowFileBatch) {
+ final List<ApiFuture<String>> futures = new ArrayList<>();
+ final List<String> successes = new ArrayList<>();
+ final List<Throwable> failures = new ArrayList<>();
+
+ final Map<String, String> attributes = flowFile.getAttributes();
+ final RecordReader reader = readerFactory.createRecordReader(
+ attributes, session.read(flowFile), flowFile.getSize(),
getLogger());
+ final RecordSet recordSet = reader.createRecordSet();
+ final RecordSchema schema = writerFactory.getSchema(attributes,
recordSet.getSchema());
+
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+ final PushBackRecordSet pushBackRecordSet = new
PushBackRecordSet(recordSet);
+
+ while (pushBackRecordSet.isAnotherRecord()) {
+ final ApiFuture<String> apiFuture = publishOneRecord(context,
flowFile, baos, writer, pushBackRecordSet.next());
+ futures.add(apiFuture);
+ addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
}
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
}
+ finishBatch(session, stopWatch, flowFileResults);
+ }
+
+ private ApiFuture<String> publishOneRecord(
+ final ProcessContext context,
+ final FlowFile flowFile,
+ final ByteArrayOutputStream baos,
+ final RecordSetWriter writer,
+ final Record record) throws IOException {
+ baos.reset();
+ writer.write(record);
+ writer.flush();
+ return publishOneMessage(context, flowFile, baos.toByteArray());
+ }
+
+ private ApiFuture<String> publishOneMessage(final ProcessContext context,
+ final FlowFile flowFile,
+ final byte[] content) {
+ final PubsubMessage message = PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(content))
+ .setPublishTime(Timestamp.newBuilder().build())
+ .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+ .build();
+ return publisher.publish(message);
+ }
+
+ private void finishBatch(final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFileResult> flowFileResults) {
+ final String topicName = publisher.getTopicNameString();
+ for (final FlowFileResult flowFileResult : flowFileResults) {
+ final Relationship relationship = flowFileResult.reconcile();
+ final Map<String, String> attributes =
flowFileResult.getAttributes();
+ attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+ final FlowFile flowFile =
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+ final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING,
topicName);
+ session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, relationship);
+ }
+ }
+
+ protected void addCallback(final ApiFuture<String> apiFuture, final
ApiFutureCallback<? super String> callback, Executor executor) {
+ ApiFutures.addCallback(apiFuture, callback, executor);
}
@OnStopped
@@ -310,14 +468,22 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
}
private Publisher.Builder getPublisherBuilder(ProcessContext context) {
- final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
+ final Long batchSizeThreshold =
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
+ final long batchBytesThreshold =
context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
+ final Long batchDelayThreshold =
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String endpoint = context.getProperty(API_ENDPOINT).getValue();
- return Publisher.newBuilder(getTopicName(context))
+ final Publisher.Builder publisherBuilder =
Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setChannelProvider(getTransportChannelProvider(context))
- .setBatchingSettings(BatchingSettings.newBuilder()
- .setElementCountThreshold(batchSize)
+ .setEndpoint(endpoint);
+
+ publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
+ .setElementCountThreshold(batchSizeThreshold)
+ .setRequestByteThreshold(batchBytesThreshold)
+ .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
.setIsEnabled(true)
.build());
+ return publisherBuilder;
}
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
index 603cfb954a..5b74f2015d 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
@@ -112,17 +112,6 @@ public class PublishGCPubSubLite extends
AbstractGCPubSubProcessor implements Ve
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
- public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor
- .Builder().name("gcp-batch-bytes")
- .displayName("Batch Bytes Threshold")
- .description("Publish request gets triggered based on this Batch
Bytes Threshold property and"
- + " the " + BATCH_SIZE.getDisplayName() + " property,
whichever condition is met first.")
- .required(true)
- .defaultValue("3 MB")
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
-
private Publisher publisher = null;
@Override
@@ -130,8 +119,9 @@ public class PublishGCPubSubLite extends
AbstractGCPubSubProcessor implements Ve
return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME,
GCP_CREDENTIALS_PROVIDER_SERVICE,
ORDERING_KEY,
- BATCH_SIZE,
- BATCH_BYTES));
+ BATCH_SIZE_THRESHOLD,
+ BATCH_BYTES_THRESHOLD,
+ BATCH_DELAY_THRESHOLD));
}
@Override
@@ -193,7 +183,7 @@ public class PublishGCPubSubLite extends
AbstractGCPubSubProcessor implements Ve
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
+ final int flowFileCount =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
final List<FlowFile> flowFiles = session.get(flowFileCount);
if (flowFiles.isEmpty()) {
@@ -290,9 +280,9 @@ public class PublishGCPubSubLite extends
AbstractGCPubSubProcessor implements Ve
.setTopicPath(topicPath)
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder()
-
.setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong())
- .setDelayThreshold(Duration.ofMillis(100))
-
.setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue())
+
.setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
+
.setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
+
.setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
.setIsEnabled(true)
.build())
.build();
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
new file mode 100644
index 0000000000..99300d1abe
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnavailableException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
+import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to
Google PubSub endpoint.
+ */
+public class FlowFileResult {
+ private static final Logger logger =
LoggerFactory.getLogger(FlowFileResult.class);
+
+ private final FlowFile flowFile;
+ private final Map<String, String> attributes;
+ private final List<ApiFuture<String>> futures;
+ private final List<String> successes;
+ private final List<Throwable> failures;
+
+ public FlowFileResult(final FlowFile flowFile, final
List<ApiFuture<String>> futures,
+ final List<String> successes, final List<Throwable>
failures) {
+ this.flowFile = flowFile;
+ this.attributes = new LinkedHashMap<>();
+ this.futures = futures;
+ this.successes = successes;
+ this.failures = failures;
+ }
+
+ /**
+ * After all in-flight messages have results, calculate appropriate {@link
Relationship}.
+ */
+ public Relationship reconcile() {
+ while (futures.size() > (successes.size() + failures.size())) {
+ try {
+ ApiFutures.allAsList(futures).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Failed to reconcile PubSub send operation
status", e);
+ }
+ }
+ if (futures.size() == successes.size()) {
+ if (futures.size() == 1) {
+ attributes.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE,
successes.iterator().next());
+ } else {
+ attributes.put(PubSubAttributes.RECORDS_ATTRIBUTE,
Integer.toString(futures.size()));
+ }
+ }
+ return RelationshipMapper.toRelationship(failures);
+ }
+
+ public FlowFile getFlowFile() {
+ return flowFile;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ /**
+ * Logic to derive an appropriate {@link Relationship} from the feedback
provided by the client library.
+ * <p>
+ * Each {@link com.google.pubsub.v1.PubsubMessage} is associated with a
{@link TrackedApiFutureCallback} at time of
+ * submission to the client library. This callback allows the client
library to convey information to the caller
+ * about the result of the (asynchronous) send. If a send fails, an
appropriate exception is conveyed, providing
+ * detail about the send failure; otherwise a message id (provided by the
service) is supplied.
+ * <p>
+ * Types of exceptions might be classified into "retryable" (another send
may be attempted) or non-retryable.
+ */
+ private static class RelationshipMapper {
+
+ private static Relationship toRelationship(final List<Throwable>
failures) {
+ Relationship relationship = PublishGCPubSub.REL_SUCCESS;
+ boolean isRetry = false;
+ boolean isFailure = false;
+ for (final Throwable failure : failures) {
+ if (isRetryException(failure)) {
+ isRetry = true;
+ } else {
+ isFailure = true;
+ break;
+ }
+ }
+ if (isFailure) {
+ relationship = PublishGCPubSub.REL_FAILURE;
+ } else if (isRetry) {
+ relationship = PublishGCPubSub.REL_RETRY;
+ }
+ return relationship;
+ }
+
+ /**
+ * Retryable exceptions indicate transient conditions; another send
attempt might succeed.
+ */
+ private static final Collection<Class<? extends Throwable>>
RETRY_EXCEPTIONS = Collections.singleton(
+ UnavailableException.class);
+
+ /**
+ * Exceptions provided by client library might include a nested
exception that indicates a transient condition,
+ * so the entire exception chain should be checked.
+ */
+ private static boolean isRetryException(final Throwable t) {
+ if (t == null) {
+ return false;
+ } else if (RETRY_EXCEPTIONS.contains(t.getClass())) {
+ return true;
+ } else {
+ final Throwable cause = t.getCause();
+ if (t.equals(cause)) {
+ return false;
+ } else {
+ return isRetryException(cause);
+ }
+ }
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
new file mode 100644
index 0000000000..9ff30e72f5
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Strategy for publishing data to GCP via <code>PublishGCPubSub</code>
processor.
+ */
+public enum MessageDerivationStrategy implements DescribedValue {
+ FLOWFILE_ORIENTED("FlowFile Oriented",
+ "Each incoming FlowFile is sent as a Google Cloud PubSub message"),
+ RECORD_ORIENTED("Record Oriented",
+ "Each incoming FlowFile is parsed into NiFi records, which are
each sent as a Google Cloud PubSub message");
+
+ private final String displayName;
+
+ private final String description;
+
+ MessageDerivationStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
new file mode 100644
index 0000000000..f4b9be8f5f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFutureCallback;
+
+import java.util.List;
+
+/**
+ * Specialization of {@link ApiFutureCallback} used to track Google PubSub
send results. Failure
+ * exceptions are captured to facilitate FlowFile routing decisions.
+ */
+public class TrackedApiFutureCallback implements ApiFutureCallback<String> {
+ private final List<String> successes;
+ private final List<Throwable> failures;
+
+ public TrackedApiFutureCallback(final List<String> successes, final
List<Throwable> failures) {
+ this.successes = successes;
+ this.failures = failures;
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ failures.add(t);
+ }
+
+ @Override
+ public void onSuccess(final String result) {
+ successes.add(result);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
index 0d7a1da562..072d8a94c8 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
@@ -42,7 +42,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
- runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10");
+ runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "10");
runner.assertValid();
@@ -64,7 +64,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
- runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+ runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
runner.assertValid();
@@ -88,7 +88,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
- runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+ runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
runner.assertValid();
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
index 145eb2b21a..3e6de5b657 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
@@ -40,7 +40,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
- runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+ runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
runner.assertValid();
@@ -61,7 +61,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
- runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+ runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
runner.assertValid();
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
new file mode 100644
index 0000000000..2870942834
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+ private static final String TOPIC = "my-topic";
+ private static final String PROJECT = "my-project";
+
+ private Throwable throwable;
+ private Publisher publisherMock;
+ private TestRunner runner;
+
+ @BeforeEach
+ void setRunner() {
+ throwable = null;
+ publisherMock = mock(Publisher.class);
+ runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ publisher = publisherMock;
+ }
+
+ @Override
+ protected void addCallback(ApiFuture<String> apiFuture,
ApiFutureCallback<? super String> callback, Executor executor) {
+ if (callback instanceof TrackedApiFutureCallback) {
+ final TrackedApiFutureCallback apiFutureCallback =
(TrackedApiFutureCallback) callback;
+ if (throwable == null) {
+
apiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+ } else {
+ apiFutureCallback.onFailure(throwable);
+ }
+ }
+ }
+ });
+ }
+
+ @Test
+ void testPropertyDescriptors() throws InitializationException {
+ runner.assertNotValid();
+
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.assertNotValid();
+
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.assertNotValid();
+
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100
millis");
+ runner.assertValid();
+ }
+
+ @Test
+ void testSendOneSuccessFlowFileStrategy() throws InitializationException {
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+
+ runner.enqueue("text");
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+
assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE));
+ }
+
+ @Test
+ void testSendOneRetryFlowFileStrategy() throws InitializationException {
+ throwable = new UnavailableException(null,
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+
+ runner.enqueue("text");
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
+ }
+
+
+ @Test
+ void testSendOneFailureFlowFileStrategy() throws InitializationException {
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(PublishGCPubSub.MAX_MESSAGE_SIZE, "16 B");
+ runner.enqueue("some really long text");
+
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
+ }
+
+ @Test
+ void testSendOneSuccessRecordStrategy() throws InitializationException,
IOException {
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
+ runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+ runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+
getClass().getClassLoader().getResource("pubsub/records.json"))));
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+ assertEquals("3",
flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
+ }
+
+ @Test
+ void testSendOneRetryRecordStrategy() throws InitializationException,
IOException {
+ throwable = new UnavailableException(null,
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
+ runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+ runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+
getClass().getClassLoader().getResource("pubsub/records.json"))));
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
+ }
+
+ @Test
+ void testSendOneFailureRecordStrategy() throws InitializationException,
IOException {
+ throwable = new
IllegalStateException("testSendOne_Failure_RecordStrategy");
+
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
+ runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+ runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+
getClass().getClassLoader().getResource("pubsub/records.json"))));
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
+ }
+
+ private static String getCredentialsServiceId(final TestRunner runner)
throws InitializationException {
+ final ControllerService controllerService =
mock(GCPCredentialsControllerService.class);
+ final String controllerServiceId =
GCPCredentialsControllerService.class.getSimpleName();
+
when(controllerService.getIdentifier()).thenReturn(controllerServiceId);
+ runner.addControllerService(controllerServiceId, controllerService);
+ runner.enableControllerService(controllerService);
+ return controllerServiceId;
+ }
+
+ private static String getReaderServiceId(TestRunner runner) throws
InitializationException {
+ final ControllerService readerService = new JsonTreeReader();
+ final String readerServiceId = readerService.getClass().getName();
+ runner.addControllerService(readerServiceId, readerService);
+ runner.enableControllerService(readerService);
+ return readerServiceId;
+ }
+
+
+ private static String getWriterServiceId(TestRunner runner) throws
InitializationException {
+ final ControllerService writerService = new JsonRecordSetWriter();
+ final String writerServiceId = writerService.getClass().getName();
+ runner.addControllerService(writerServiceId, writerService);
+ runner.enableControllerService(writerService);
+ return writerServiceId;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
new file mode 100644
index 0000000000..9204fc6aff
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import org.apache.nifi.controller.ControllerService;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PublishGCPubSubLiteTest {
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setRunner() {
+ runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
+ }
+
+ @Test
+ void testPropertyDescriptors() throws InitializationException {
+ runner.assertNotValid();
+
+ final ControllerService controllerService = new
GCPCredentialsControllerService();
+ final String controllerServiceId =
GCPCredentialsControllerService.class.getSimpleName();
+ runner.addControllerService(controllerServiceId, controllerService);
+ runner.enableControllerService(controllerService);
+
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE,
controllerServiceId);
+ runner.assertNotValid();
+
+ runner.setProperty(PublishGCPubSubLite.TOPIC_NAME,
"projects/my-project/locations/my-location/topics/my-topic");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
+ runner.assertValid();
+
+ runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
+ runner.assertNotValid();
+ runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100
millis");
+ runner.assertValid();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
new file mode 100644
index 0000000000..08ae479321
--- /dev/null
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
@@ -0,0 +1,17 @@
+[
+ {
+ "name": "Acme1",
+ "address": "1234 First Street",
+ "zip": "12345"
+ },
+ {
+ "name": "Acme2",
+ "address": "1234 Second Street",
+ "zip": "12345"
+ },
+ {
+ "name": "Acme3",
+ "address": "1234 Third Street",
+ "zip": "12345"
+ }
+]
\ No newline at end of file