exceptionfactory commented on code in PR #7274:
URL: https://github.com/apache/nifi/pull/7274#discussion_r1199202716


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/ContentInputStrategy.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Strategy for publishing data to GCP via <code>PublishGCPubSub</code> 
processor.
+ */
+public enum ContentInputStrategy implements DescribedValue {
+    FLOWFILE_ORIENTED("FlowFile Oriented",
+            "Each incoming FlowFile is sent as a Google Cloud PubSub message"),
+    RECORD_ORIENTED("FlowFile Record Oriented",

Review Comment:
   Should the display name be `Record Oriented`?



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import org.apache.nifi.controller.ControllerService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PublishGCPubSubLiteTest {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        final ControllerService controllerService = new 
GCPCredentialsControllerService();
+        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+        runner.addControllerService(controllerServiceId, controllerService);
+        runner.enableControllerService(controllerService);
+        //runner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
controllerServiceId);

Review Comment:
   It looks like this commented line should be removed.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##########
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
             .defaultValue("15")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
+                    + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " 
property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-delay")
+            .displayName("Batch Delay Threshold")
+            .description("Indicates the delay threshold to use for batching. 
After this amount of time has elapsed " +
+                    "(counting from the first element added), the elements 
will be wrapped up in a batch and sent. " +
+                    "This value should not be set too high, usually on the 
order of milliseconds. Otherwise, calls " +
+                    "might appear to never complete.")
+            .defaultValue("100 millis")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PUBSUB_ENDPOINT = new 
PropertyDescriptor
+            .Builder().name("pubsub-api-endpoint")
+            .displayName("PubSub API Endpoint")

Review Comment:
   Since the Processor name already includes `PubSub`, this could be shortened:
   ```suggestion
               .Builder().name("API Endpoint")
               .displayName("API Endpoint")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import org.apache.nifi.controller.ControllerService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PublishGCPubSubTest {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        runner = TestRunners.newTestRunner(PublishGCPubSub.class);
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        final ControllerService controllerService = new 
GCPCredentialsControllerService();
+        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+        runner.addControllerService(controllerServiceId, controllerService);
+        runner.enableControllerService(controllerService);
+        //runner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
controllerServiceId);

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##########
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
             .defaultValue("15")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
+                    + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " 
property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-delay")
+            .displayName("Batch Delay Threshold")

Review Comment:
   ```suggestion
               .name("Batch Delay Threshold")
               .displayName("Batch Delay Threshold")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##########
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
             .defaultValue("15")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
+                    + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " 
property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-delay")
+            .displayName("Batch Delay Threshold")
+            .description("Indicates the delay threshold to use for batching. 
After this amount of time has elapsed " +
+                    "(counting from the first element added), the elements 
will be wrapped up in a batch and sent. " +
+                    "This value should not be set too high, usually on the 
order of milliseconds. Otherwise, calls " +
+                    "might appear to never complete.")
+            .defaultValue("100 millis")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PUBSUB_ENDPOINT = new 
PropertyDescriptor
+            .Builder().name("pubsub-api-endpoint")
+            .displayName("PubSub API Endpoint")
+            .description("Override the gRPC endpoint (form is \"host:port\").")
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)

Review Comment:
   Instead of making this an optional property, what do you think about making 
it required and setting the default value to the current internal default? That 
would help indicate the expected format in addition to the description.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -137,9 +211,10 @@ public Set<Relationship> getRelationships() {
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         try {
+            storedException.set(null);

Review Comment:
   The `storedException` usage should be removed and failures should be thrown 
in this method, instead of allowing the framework to start the Processor.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -88,6 +104,56 @@
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-batch-size")
+            .displayName("Maximum Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor CONTENT_INPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("content-input-strategy")
+            .displayName("Content Input Strategy")
+            .description("The strategy used to publish the incoming FlowFile 
to the Google Cloud PubSub endpoint.")
+            .required(true)
+            .defaultValue(ContentInputStrategy.FLOWFILE_ORIENTED.getValue())
+            .allowableValues(ContentInputStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(CONTENT_INPUT_STRATEGY, 
ContentInputStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to GCPubSub endpoint")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(CONTENT_INPUT_STRATEGY, 
ContentInputStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max.message.size")
+            .displayName("Max Message Size")

Review Comment:
   For consistency, recommend using `Maximum` instead of `Max`.
   ```suggestion
               .name("Maximum Message Size")
               .displayName("Maximum Message Size")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -137,9 +211,10 @@ public Set<Relationship> getRelationships() {
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         try {
+            storedException.set(null);
             publisher = getPublisherBuilder(context).build();
         } catch (IOException e) {
-            getLogger().error("Failed to create Google Cloud PubSub Publisher 
due to {}", new Object[]{e});
+            getLogger().error("Failed to create Google Cloud PubSub Publisher 
due to {}", e);

Review Comment:
   The `due to {}` approach should not be used because the logger writes the 
entire stack trace from the exception. This error logging should be retained 
and the exception can be thrown after logging.
   ```suggestion
               getLogger().error("Failed to create Google Cloud PubSub 
Publisher", e);
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", 
messageTracker.size());
+            final List<String> messageIdsSuccess = 
ApiFutures.successfulAsList(messageTracker.getFutures()).get();
+            getLogger().trace("Send of batch complete, success size {}", 
messageIdsSuccess.size());
+            messageTracker.reconcile(messageIdsSuccess);
+            final String topicName = publisher.getTopicNameString();
+            for (final FlowFileResult flowFileResult : 
messageTracker.getFlowFileResults()) {
+                final Map<String, String> attributes = new HashMap<>();
+                //attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get()); 
 // what to do if using record strategy?
+                attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+                final FlowFile flowFile = 
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+                final String transitUri = 
String.format(TRANSIT_URI_FORMAT_STRING, topicName);
+                session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.transfer(flowFile, flowFileResult.getRelationship());
+                if (flowFileResult.getException() != null) {
+                    getLogger().error("FlowFile send failure", 
flowFileResult.getException());
                 }
             }
+        } catch (final InterruptedException | ExecutionException e) {
+            session.rollback();

Review Comment:
   Session rollback should be removed and instead, FlowFiles should be routed 
to the appropriate location.
   
   The PubSub API does not appear to support the concept of client-controlled 
offset handling, which means the message tracker should be used to categorize 
and route FlowFiles and Records as needed. For the FlowFile strategy, the 
entire FlowFile could be routed to failure or retry. For the Record strategy, 
the best option seems to be writing success records and failure records to 
separate new FlowFiles. That would avoid duplicate messages and also support 
retrying failed messages on a per-record basis.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());

Review Comment:
   This seems better as a `debug` message instead of trace. The FlowFile object 
has a reasonable `toString()` method which provides a better representation for 
logging.
   ```suggestion
               getLogger().trace("Parsing Records in {} completed: {} messages 
tracked", flowFile, messageTracker.size());
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", 
messageTracker.size());
+            final List<String> messageIdsSuccess = 
ApiFutures.successfulAsList(messageTracker.getFutures()).get();
+            getLogger().trace("Send of batch complete, success size {}", 
messageIdsSuccess.size());
+            messageTracker.reconcile(messageIdsSuccess);
+            final String topicName = publisher.getTopicNameString();
+            for (final FlowFileResult flowFileResult : 
messageTracker.getFlowFileResults()) {
+                final Map<String, String> attributes = new HashMap<>();

Review Comment:
   ```suggestion
                   final Map<String, String> attributes = new LinkedHashMap<>();
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##########
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
             .defaultValue("15")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")

Review Comment:
   Recommend aligning property name and display name on these new properties.
   ```suggestion
               .name("Batch Bytes Threshold")
               .displayName("Batch Bytes Threshold")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -88,6 +104,56 @@
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-batch-size")
+            .displayName("Maximum Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor CONTENT_INPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("content-input-strategy")

Review Comment:
   ```suggestion
               .name("Content Input Strategy")
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##########
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
-            .displayName("Batch Size")
+            .displayName("Batch Size Threshold")
             .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
                     "will be used in a batch")
             .required(true)
             .defaultValue("15")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch 
Bytes Threshold property and"
+                    + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " 
property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-delay")
+            .displayName("Batch Delay Threshold")
+            .description("Indicates the delay threshold to use for batching. 
After this amount of time has elapsed " +
+                    "(counting from the first element added), the elements 
will be wrapped up in a batch and sent. " +
+                    "This value should not be set too high, usually on the 
order of milliseconds. Otherwise, calls " +
+                    "might appear to never complete.")
+            .defaultValue("100 millis")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PUBSUB_ENDPOINT = new 
PropertyDescriptor
+            .Builder().name("pubsub-api-endpoint")
+            .displayName("PubSub API Endpoint")
+            .description("Override the gRPC endpoint (form is \"host:port\").")

Review Comment:
   Recommend adjusting the example to avoid escaping.
   ```suggestion
               .description("Override the gRPC endpoint in the form of 
[host:port]")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -88,6 +104,56 @@
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-batch-size")
+            .displayName("Maximum Batch Size")

Review Comment:
   For clarity, what do you think about naming this `Input Batch Size`? That 
would help differentiate it from message batching output sizes.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();

Review Comment:
   As mentioned above, use of `storedException` should be removed.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -88,6 +104,56 @@
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-batch-size")
+            .displayName("Maximum Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor CONTENT_INPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("content-input-strategy")
+            .displayName("Content Input Strategy")
+            .description("The strategy used to publish the incoming FlowFile 
to the Google Cloud PubSub endpoint.")
+            .required(true)
+            .defaultValue(ContentInputStrategy.FLOWFILE_ORIENTED.getValue())
+            .allowableValues(ContentInputStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(CONTENT_INPUT_STRATEGY, 
ContentInputStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to GCPubSub endpoint")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(CONTENT_INPUT_STRATEGY, 
ContentInputStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max.message.size")
+            .displayName("Max Message Size")
+            .description("The maximum size of a GCPubSub message in bytes. 
Defaults to 1 MB (1048576).")

Review Comment:
   ```suggestion
               .description("The maximum size of a Google PubSub message in 
bytes. Defaults to 1 MB (1048576 bytes)")
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);

Review Comment:
   Exceptions should always include a clarifying message. Recommend adding 
something thing like, "Record publishing failed".
   ```suggestion
               throw new ProcessException("Record publishing failed", e);
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", 
messageTracker.size());
+            final List<String> messageIdsSuccess = 
ApiFutures.successfulAsList(messageTracker.getFutures()).get();
+            getLogger().trace("Send of batch complete, success size {}", 
messageIdsSuccess.size());

Review Comment:
   Recommend adjusting the logging in include the batch size in this log.
   ```suggestion
               getLogger().debug("Successful Messages [{}] of Batched Messages 
[{}]", messageTracker.size(), messageIdsSuccess.size());
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", 
messageTracker.size());

Review Comment:
   This could be changed to `debug`.
   ```suggestion
               getLogger().debug("Finish batch of Messages [{}]", 
messageTracker.size());
   
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,61 +289,139 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if (storedException.get() != null) {
+            getLogger().error("Google Cloud PubSub Publisher was not properly 
created due to {}", storedException.get());
+            context.yield();
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.emptyList(), new IllegalArgumentException(message)));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+                final ApiFuture<String> future = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                messageTracker.add(new FlowFileResult(flowFile, 
Collections.singletonList(future)));
+            }
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
 
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
         try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException(e);
+        }
+    }
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+    private void onTriggerRecordStrategyInner(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+        final MessageTracker messageTracker = new MessageTracker();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final Map<String, String> attributes = flowFile.getAttributes();
+            final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            while (pushBackRecordSet.isAnotherRecord()) {
+                final ApiFuture<String> future = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
+                futures.add(future);
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+            messageTracker.add(new FlowFileResult(flowFile, futures));
+            getLogger().trace("Parsing of FlowFile (ID:{}) records complete, 
now {} messages", flowFile.getId(), messageTracker.size());
+        }
+        finishBatch(session, flowFileBatch, stopWatch, messageTracker);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final List<FlowFile> flowFileBatch,
+                             final StopWatch stopWatch,
+                             final MessageTracker messageTracker) {
+        try {
+            getLogger().trace("Submit of batch complete, size {}", 
messageTracker.size());
+            final List<String> messageIdsSuccess = 
ApiFutures.successfulAsList(messageTracker.getFutures()).get();
+            getLogger().trace("Send of batch complete, success size {}", 
messageIdsSuccess.size());
+            messageTracker.reconcile(messageIdsSuccess);
+            final String topicName = publisher.getTopicNameString();
+            for (final FlowFileResult flowFileResult : 
messageTracker.getFlowFileResults()) {
+                final Map<String, String> attributes = new HashMap<>();
+                //attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get()); 
 // what to do if using record strategy?
+                attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+                final FlowFile flowFile = 
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+                final String transitUri = 
String.format(TRANSIT_URI_FORMAT_STRING, topicName);
+                session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.transfer(flowFile, flowFileResult.getRelationship());
+                if (flowFileResult.getException() != null) {
+                    getLogger().error("FlowFile send failure", 
flowFileResult.getException());

Review Comment:
   The FlowFile should be included.
   ```suggestion
                       getLogger().error("Send Failed for {}", flowFile, 
flowFileResult.getException());
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to