This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 336b857442 NIFI-11553 Add Record handling and more Properties for GCP 
PubSub
336b857442 is described below

commit 336b857442109e1f2c8b3b6108106514609159f5
Author: Paul Grey <[email protected]>
AuthorDate: Tue May 16 09:38:39 2023 -0400

    NIFI-11553 Add Record handling and more Properties for GCP PubSub
    
    - Added Message Derivation Strategy to PublishGCPubSub with FlowFile and 
Record options
    - Added API Endpoint property to PublishGCPPubSub and ConsumeGCPubSub
    - Added Batch configuration properties
    
    This closes #7274
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../gcp/pubsub/AbstractGCPubSubProcessor.java      |  39 ++-
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     |  13 +-
 .../processors/gcp/pubsub/PubSubAttributes.java    |   3 +
 .../processors/gcp/pubsub/PublishGCPubSub.java     | 320 ++++++++++++++++-----
 .../gcp/pubsub/lite/PublishGCPubSubLite.java       |  24 +-
 .../gcp/pubsub/publish/FlowFileResult.java         | 143 +++++++++
 .../pubsub/publish/MessageDerivationStrategy.java  |  53 ++++
 .../pubsub/publish/TrackedApiFutureCallback.java   |  45 +++
 .../processors/gcp/pubsub/ConsumeGCPubSubIT.java   |   6 +-
 .../processors/gcp/pubsub/PublishGCPubSubIT.java   |   4 +-
 .../processors/gcp/pubsub/PublishGCPubSubTest.java | 233 +++++++++++++++
 .../gcp/pubsub/lite/PublishGCPubSubLiteTest.java   |  65 +++++
 .../src/test/resources/pubsub/records.json         |  17 ++
 13 files changed, 859 insertions(+), 106 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index fc135dba81..6602a0e0c8 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -19,9 +19,11 @@ package org.apache.nifi.processors.gcp.pubsub;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.ServiceOptions;
 
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.VerifiableProcessor;
@@ -36,9 +38,9 @@ import java.util.Set;
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
@@ -46,6 +48,39 @@ public abstract class AbstractGCPubSubProcessor extends 
AbstractGCPProcessor imp
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
+                    + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " 
property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-delay")
+            .displayName("Batch Delay Threshold")
+            .description("Indicates the delay threshold to use for batching. 
After this amount of time has elapsed " +
+                    "(counting from the first element added), the elements 
will be wrapped up in a batch and sent. " +
+                    "This value should not be set too high, usually on the 
order of milliseconds. Otherwise, calls " +
+                    "might appear to never complete.")
+            .required(true)
+            .defaultValue("100 millis")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor API_ENDPOINT = new 
PropertyDescriptor
+            .Builder().name("api-endpoint")
+            .displayName("API Endpoint")
+            .description("Override the gRPC endpoint in the form of 
[host:port]")
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .defaultValue(PublisherStubSettings.getDefaultEndpoint())  // 
identical to SubscriberStubSettings.getDefaultEndpoint()
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles are routed to this relationship after a 
successful Google Cloud Pub/Sub operation.")
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 3928a2ae63..77d9fcac87 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -102,7 +102,7 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
-        final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final Integer batchSize = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
 
         pullRequest = PullRequest.newBuilder()
                 .setMaxMessages(batchSize)
@@ -192,7 +192,8 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         descriptors.add(SUBSCRIPTION);
-        descriptors.add(BATCH_SIZE);
+        descriptors.add(BATCH_SIZE_THRESHOLD);
+        descriptors.add(API_ENDPOINT);
         return Collections.unmodifiableList(descriptors);
     }
 
@@ -268,11 +269,13 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
     }
 
     private SubscriberStub getSubscriber(final ProcessContext context) throws 
IOException {
-        final SubscriberStubSettings subscriberStubSettings = 
SubscriberStubSettings.newBuilder()
+        final String endpoint = context.getProperty(API_ENDPOINT).getValue();
+
+        final SubscriberStubSettings.Builder subscriberBuilder = 
SubscriberStubSettings.newBuilder()
                 
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
                 
.setTransportChannelProvider(getTransportChannelProvider(context))
-                .build();
+                .setEndpoint(endpoint);
 
-        return GrpcSubscriberStub.create(subscriberStubSettings);
+        return GrpcSubscriberStub.create(subscriberBuilder.build());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
index cfcdda5649..b9654112d4 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -21,6 +21,9 @@ public class PubSubAttributes {
     public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
     public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub 
message published to the configured Google Cloud PubSub topic";
 
+    public static final String RECORDS_ATTRIBUTE = "gcp.pubsub.count.records";
+    public static final String RECORDS_DESCRIPTION = "Count of pubsub messages 
published to the configured Google Cloud PubSub topic";
+
     public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
     public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google 
Cloud PubSub topic the message was published to";
 
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 86545ab5fd..f2d5ed7ebd 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -17,13 +17,15 @@
 package org.apache.nifi.processors.gcp.pubsub;
 
 import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
 import com.google.api.gax.batching.BatchingSettings;
 import com.google.api.gax.core.FixedCredentialsProvider;
 import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.rpc.DeadlineExceededException;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
 import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.iam.v1.TestIamPermissionsRequest;
 import com.google.iam.v1.TestIamPermissionsResponse;
 import com.google.protobuf.ByteString;
@@ -48,11 +50,27 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.threeten.bp.Duration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -64,12 +82,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
 
@@ -82,12 +101,63 @@ import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
         description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 @WritesAttributes({
         @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = 
RECORDS_DESCRIPTION),
         @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
 })
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Input Batch Size")
+            .displayName("Input Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Message Derivation Strategy")
+            .displayName("Message Derivation Strategy")
+            .description("The strategy used to publish the incoming FlowFile 
to the Google Cloud PubSub endpoint.")
+            .required(true)
+            
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+            .allowableValues(MessageDerivationStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to GCPubSub endpoint")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Message Size")
+            .displayName("Maximum Message Size")
+            .description("The maximum size of a Google PubSub message in 
bytes. Defaults to 1 MB (1048576 bytes)")
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
 
     public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-topic")
@@ -103,14 +173,21 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
             .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
             .build();
 
-    private Publisher publisher = null;
-    private final AtomicReference<Exception> storedException = new 
AtomicReference<>();
+    protected Publisher publisher = null;
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        descriptors.add(MAX_BATCH_SIZE);
+        descriptors.add(MESSAGE_DERIVATION_STRATEGY);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(MAX_MESSAGE_SIZE);
         descriptors.add(TOPIC_NAME);
-        descriptors.add(BATCH_SIZE);
+        descriptors.add(BATCH_SIZE_THRESHOLD);
+        descriptors.add(BATCH_BYTES_THRESHOLD);
+        descriptors.add(BATCH_DELAY_THRESHOLD);
+        descriptors.add(API_ENDPOINT);
         return Collections.unmodifiableList(descriptors);
     }
 
@@ -139,8 +216,7 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
         try {
             publisher = getPublisherBuilder(context).build();
         } catch (IOException e) {
-            getLogger().error("Failed to create Google Cloud PubSub Publisher 
due to {}", new Object[]{e});
-            storedException.set(e);
+            throw new ProcessException("Failed to create Google Cloud PubSub 
Publisher", e);
         }
     }
 
@@ -171,25 +247,26 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
                         
.setTransportChannelProvider(getTransportChannelProvider(context))
                         .build();
 
-                final GrpcPublisherStub publisherStub = 
GrpcPublisherStub.create(publisherStubSettings);
-                final String topicName = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-                final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder()
-                        .addAllPermissions(REQUIRED_PERMISSIONS)
-                        .setResource(topicName)
-                        .build();
-                final TestIamPermissionsResponse response = 
publisherStub.testIamPermissionsCallable().call(request);
-                if (response.getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
-                    results.add(new ConfigVerificationResult.Builder()
-                            .verificationStepName("Test IAM Permissions")
-                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                            .explanation(String.format("Verified Topic [%s] 
exists and the configured user has the correct permissions.", topicName))
-                            .build());
-                } else {
-                    results.add(new ConfigVerificationResult.Builder()
-                            .verificationStepName("Test IAM Permissions")
-                            .outcome(ConfigVerificationResult.Outcome.FAILED)
-                            .explanation(String.format("The configured user 
does not have the correct permissions on Topic [%s].", topicName))
-                            .build());
+                try (GrpcPublisherStub publisherStub = 
GrpcPublisherStub.create(publisherStubSettings)) {
+                    final String topicName = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+                    final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder()
+                            .addAllPermissions(REQUIRED_PERMISSIONS)
+                            .setResource(topicName)
+                            .build();
+                    final TestIamPermissionsResponse response = 
publisherStub.testIamPermissionsCallable().call(request);
+                    if (response.getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
+                        results.add(new ConfigVerificationResult.Builder()
+                                .verificationStepName("Test IAM Permissions")
+                                
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                                .explanation(String.format("Verified Topic 
[%s] exists and the configured user has the correct permissions.", topicName))
+                                .build());
+                    } else {
+                        results.add(new ConfigVerificationResult.Builder()
+                                .verificationStepName("Test IAM Permissions")
+                                
.outcome(ConfigVerificationResult.Outcome.FAILED)
+                                .explanation(String.format("The configured 
user does not have the correct permissions on Topic [%s].", topicName))
+                                .build());
+                    }
                 }
             } catch (final ApiException e) {
                 verificationLogger.error("The configured user appears to have 
the correct permissions, but the following error was encountered", e);
@@ -213,62 +290,143 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final MessageDerivationStrategy inputStrategy = 
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if 
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if 
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+        final Executor executor = MoreExecutors.directExecutor();
+        final List<FlowFileResult> flowFileResults = new ArrayList<>();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+        for (final FlowFile flowFile : flowFileBatch) {
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            final List<String> successes = new ArrayList<>();
+            final List<Throwable> failures = new ArrayList<>();
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                failures.add(new IllegalArgumentException(message));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+                final ApiFuture<String> apiFuture = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                futures.add(apiFuture);
+                addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
-                }
+        }
+        finishBatch(session, stopWatch, flowFileResults);
+    }
+
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        try {
+            onTriggerRecordStrategyPublishRecords(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException("Record publishing failed", e);
+        }
+    }
+
+    private void onTriggerRecordStrategyPublishRecords(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        final Executor executor = MoreExecutors.directExecutor();
+        final List<FlowFileResult> flowFileResults = new ArrayList<>();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        for (final FlowFile flowFile : flowFileBatch) {
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            final List<String> successes = new ArrayList<>();
+            final List<Throwable> failures = new ArrayList<>();
+
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
+
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> apiFuture = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(apiFuture);
+                addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
             }
+            flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
         }
+        finishBatch(session, stopWatch, flowFileResults);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final StopWatch stopWatch,
+                             final List<FlowFileResult> flowFileResults) {
+        final String topicName = publisher.getTopicNameString();
+        for (final FlowFileResult flowFileResult : flowFileResults) {
+            final Relationship relationship = flowFileResult.reconcile();
+            final Map<String, String> attributes = 
flowFileResult.getAttributes();
+            attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+            final FlowFile flowFile = 
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+            final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, 
topicName);
+            session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, relationship);
+        }
+    }
+
+    protected void addCallback(final ApiFuture<String> apiFuture, final 
ApiFutureCallback<? super String> callback, Executor executor) {
+        ApiFutures.addCallback(apiFuture, callback, executor);
     }
 
     @OnStopped
@@ -310,14 +468,22 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
     }
 
     private Publisher.Builder getPublisherBuilder(ProcessContext context) {
-        final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
+        final Long batchSizeThreshold = 
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
+        final long batchBytesThreshold = 
context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
+        final Long batchDelayThreshold = 
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String endpoint = context.getProperty(API_ENDPOINT).getValue();
 
-        return Publisher.newBuilder(getTopicName(context))
+        final Publisher.Builder publisherBuilder = 
Publisher.newBuilder(getTopicName(context))
                 
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
                 .setChannelProvider(getTransportChannelProvider(context))
-                .setBatchingSettings(BatchingSettings.newBuilder()
-                .setElementCountThreshold(batchSize)
+                .setEndpoint(endpoint);
+
+        publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
+                .setElementCountThreshold(batchSizeThreshold)
+                .setRequestByteThreshold(batchBytesThreshold)
+                .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
                 .setIsEnabled(true)
                 .build());
+        return publisherBuilder;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
index 603cfb954a..5b74f2015d 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
@@ -112,17 +112,6 @@ public class PublishGCPubSubLite extends 
AbstractGCPubSubProcessor implements Ve
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor
-            .Builder().name("gcp-batch-bytes")
-            .displayName("Batch Bytes Threshold")
-            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
-                    + " the " + BATCH_SIZE.getDisplayName() + " property, 
whichever condition is met first.")
-            .required(true)
-            .defaultValue("3 MB")
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
     private Publisher publisher = null;
 
     @Override
@@ -130,8 +119,9 @@ public class PublishGCPubSubLite extends 
AbstractGCPubSubProcessor implements Ve
         return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME,
                 GCP_CREDENTIALS_PROVIDER_SERVICE,
                 ORDERING_KEY,
-                BATCH_SIZE,
-                BATCH_BYTES));
+                BATCH_SIZE_THRESHOLD,
+                BATCH_BYTES_THRESHOLD,
+                BATCH_DELAY_THRESHOLD));
     }
 
     @Override
@@ -193,7 +183,7 @@ public class PublishGCPubSubLite extends 
AbstractGCPubSubProcessor implements Ve
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
+        final int flowFileCount = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
         final List<FlowFile> flowFiles = session.get(flowFileCount);
 
         if (flowFiles.isEmpty()) {
@@ -290,9 +280,9 @@ public class PublishGCPubSubLite extends 
AbstractGCPubSubProcessor implements Ve
                     .setTopicPath(topicPath)
                     
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
                     .setBatchingSettings(BatchingSettings.newBuilder()
-                            
.setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong())
-                            .setDelayThreshold(Duration.ofMillis(100))
-                            
.setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue())
+                            
.setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
+                            
.setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
+                            
.setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
                             .setIsEnabled(true)
                             .build())
                     .build();
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
new file mode 100644
index 0000000000..99300d1abe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnavailableException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
+import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to 
Google PubSub endpoint.
+ */
+public class FlowFileResult {
+    private static final Logger logger = 
LoggerFactory.getLogger(FlowFileResult.class);
+
+    private final FlowFile flowFile;
+    private final Map<String, String> attributes;
+    private final List<ApiFuture<String>> futures;
+    private final List<String> successes;
+    private final List<Throwable> failures;
+
+    public FlowFileResult(final FlowFile flowFile, final 
List<ApiFuture<String>> futures,
+                          final List<String> successes, final List<Throwable> 
failures) {
+        this.flowFile = flowFile;
+        this.attributes = new LinkedHashMap<>();
+        this.futures = futures;
+        this.successes = successes;
+        this.failures = failures;
+    }
+
+    /**
+     * After all in-flight messages have results, calculate appropriate {@link 
Relationship}.
+     */
+    public Relationship reconcile() {
+        while (futures.size() > (successes.size() + failures.size())) {
+            try {
+                ApiFutures.allAsList(futures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                logger.error("Failed to reconcile PubSub send operation 
status", e);
+            }
+        }
+        if (futures.size() == successes.size()) {
+            if (futures.size() == 1) {
+                attributes.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, 
successes.iterator().next());
+            } else {
+                attributes.put(PubSubAttributes.RECORDS_ATTRIBUTE, 
Integer.toString(futures.size()));
+            }
+        }
+        return RelationshipMapper.toRelationship(failures);
+    }
+
+    public FlowFile getFlowFile() {
+        return flowFile;
+    }
+
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    /**
+     * Logic to derive an appropriate {@link Relationship} from the feedback 
provided by the client library.
+     * <p>
+     * Each {@link com.google.pubsub.v1.PubsubMessage} is associated with a 
{@link TrackedApiFutureCallback} at time of
+     * submission to the client library.  This callback allows the client 
library to convey information to the caller
+     * about the result of the (asynchronous) send.  If a send fails, an 
appropriate exception is conveyed, providing
+     * detail about the send failure; otherwise a message id (provided by the 
service) is supplied.
+     * <p>
+     * Types of exceptions might be classified into "retryable" (another send 
may be attempted) or non-retryable.
+     */
+    private static class RelationshipMapper {
+
+        private static Relationship toRelationship(final List<Throwable> 
failures) {
+            Relationship relationship = PublishGCPubSub.REL_SUCCESS;
+            boolean isRetry = false;
+            boolean isFailure = false;
+            for (final Throwable failure : failures) {
+                if (isRetryException(failure)) {
+                    isRetry = true;
+                } else {
+                    isFailure = true;
+                    break;
+                }
+            }
+            if (isFailure) {
+                relationship = PublishGCPubSub.REL_FAILURE;
+            } else if (isRetry) {
+                relationship = PublishGCPubSub.REL_RETRY;
+            }
+            return relationship;
+        }
+
+        /**
+         * Retryable exceptions indicate transient conditions; another send 
attempt might succeed.
+         */
+        private static final Collection<Class<? extends Throwable>> 
RETRY_EXCEPTIONS = Collections.singleton(
+                UnavailableException.class);
+
+        /**
+         * Exceptions provided by client library might include a nested 
exception that indicates a transient condition,
+         * so the entire exception chain should be checked.
+         */
+        private static boolean isRetryException(final Throwable t) {
+            if (t == null) {
+                return false;
+            } else if (RETRY_EXCEPTIONS.contains(t.getClass())) {
+                return true;
+            } else {
+                final Throwable cause = t.getCause();
+                if (t.equals(cause)) {
+                    return false;
+                } else {
+                    return isRetryException(cause);
+                }
+            }
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
new file mode 100644
index 0000000000..9ff30e72f5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Strategy for publishing data to GCP via <code>PublishGCPubSub</code> 
processor.
+ */
+public enum MessageDerivationStrategy implements DescribedValue {
+    FLOWFILE_ORIENTED("FlowFile Oriented",
+            "Each incoming FlowFile is sent as a Google Cloud PubSub message"),
+    RECORD_ORIENTED("Record Oriented",
+            "Each incoming FlowFile is parsed into NiFi records, which are 
each sent as a Google Cloud PubSub message");
+
+    private final String displayName;
+
+    private final String description;
+
+    MessageDerivationStrategy(final String displayName, final String 
description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
new file mode 100644
index 0000000000..f4b9be8f5f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFutureCallback;
+
+import java.util.List;
+
+/**
+ * Specialization of {@link ApiFutureCallback} used to track Google PubSub 
send results.  Failure
+ * exceptions are captured to facilitate FlowFile routing decisions.
+ */
+public class TrackedApiFutureCallback implements ApiFutureCallback<String> {
+    private final List<String> successes;
+    private final List<Throwable> failures;
+
+    public TrackedApiFutureCallback(final List<String> successes, final 
List<Throwable> failures) {
+        this.successes = successes;
+        this.failures = failures;
+    }
+
+    @Override
+    public void onFailure(final Throwable t) {
+        failures.add(t);
+    }
+
+    @Override
+    public void onSuccess(final String result) {
+        successes.add(result);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
index 0d7a1da562..072d8a94c8 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
@@ -42,7 +42,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
         runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
         runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
         runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
-        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10");
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "10");
 
         runner.assertValid();
 
@@ -64,7 +64,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
         runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
         runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
         runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
-        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
 
         runner.assertValid();
 
@@ -88,7 +88,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
         runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
         runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
         runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
-        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
 
         runner.assertValid();
 
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
index 145eb2b21a..3e6de5b657 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
@@ -40,7 +40,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
         runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
         runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
         runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
-        runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
 
         runner.assertValid();
 
@@ -61,7 +61,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
         runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
         runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
         runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
-        runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
 
         runner.assertValid();
 
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
new file mode 100644
index 0000000000..2870942834
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private static final String TOPIC = "my-topic";
+    private static final String PROJECT = "my-project";
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof TrackedApiFutureCallback) {
+                    final TrackedApiFutureCallback apiFutureCallback = 
(TrackedApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
apiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        apiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOneSuccessFlowFileStrategy() throws InitializationException {
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+
+        runner.enqueue("text");
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+        
assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE));
+    }
+
+    @Test
+    void testSendOneRetryFlowFileStrategy() throws InitializationException {
+        throwable = new UnavailableException(null, 
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+
+        runner.enqueue("text");
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
+    }
+
+
+    @Test
+    void testSendOneFailureFlowFileStrategy() throws InitializationException {
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(PublishGCPubSub.MAX_MESSAGE_SIZE, "16 B");
+        runner.enqueue("some really long text");
+
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
+    }
+
+    @Test
+    void testSendOneSuccessRecordStrategy() throws InitializationException, 
IOException {
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
+        runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+        runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+                
getClass().getClassLoader().getResource("pubsub/records.json"))));
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+        assertEquals("3", 
flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
+    }
+
+    @Test
+    void testSendOneRetryRecordStrategy() throws InitializationException, 
IOException {
+        throwable = new UnavailableException(null, 
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
+        runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+        runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+                
getClass().getClassLoader().getResource("pubsub/records.json"))));
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
+    }
+
+    @Test
+    void testSendOneFailureRecordStrategy() throws InitializationException, 
IOException {
+        throwable = new 
IllegalStateException("testSendOne_Failure_RecordStrategy");
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
+        runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+        runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+                
getClass().getClassLoader().getResource("pubsub/records.json"))));
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
+    }
+
+    private static String getCredentialsServiceId(final TestRunner runner) 
throws InitializationException {
+        final ControllerService controllerService = 
mock(GCPCredentialsControllerService.class);
+        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+        
when(controllerService.getIdentifier()).thenReturn(controllerServiceId);
+        runner.addControllerService(controllerServiceId, controllerService);
+        runner.enableControllerService(controllerService);
+        return controllerServiceId;
+    }
+
+    private static String getReaderServiceId(TestRunner runner) throws 
InitializationException {
+        final ControllerService readerService = new JsonTreeReader();
+        final String readerServiceId = readerService.getClass().getName();
+        runner.addControllerService(readerServiceId, readerService);
+        runner.enableControllerService(readerService);
+        return readerServiceId;
+    }
+
+
+    private static String getWriterServiceId(TestRunner runner) throws 
InitializationException {
+        final ControllerService writerService = new JsonRecordSetWriter();
+        final String writerServiceId = writerService.getClass().getName();
+        runner.addControllerService(writerServiceId, writerService);
+        runner.enableControllerService(writerService);
+        return writerServiceId;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
new file mode 100644
index 0000000000..9204fc6aff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import org.apache.nifi.controller.ControllerService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PublishGCPubSubLiteTest {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        final ControllerService controllerService = new 
GCPCredentialsControllerService();
+        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+        runner.addControllerService(controllerServiceId, controllerService);
+        runner.enableControllerService(controllerService);
+        
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE, 
controllerServiceId);
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSubLite.TOPIC_NAME, 
"projects/my-project/locations/my-location/topics/my-topic");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
new file mode 100644
index 0000000000..08ae479321
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json
@@ -0,0 +1,17 @@
+[
+  {
+    "name": "Acme1",
+    "address": "1234 First Street",
+    "zip": "12345"
+  },
+  {
+    "name": "Acme2",
+    "address": "1234 Second Street",
+    "zip": "12345"
+  },
+  {
+    "name": "Acme3",
+    "address": "1234 Third Street",
+    "zip": "12345"
+  }
+]
\ No newline at end of file

Reply via email to