This is an automated email from the ASF dual-hosted git repository.

lehelb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fb17c10b2f NIFI-14020 - Add Record and Demarcator support to 
ConsumeGCPubSub
fb17c10b2f is described below

commit fb17c10b2fc2ebf78f96f7e9b105f2143c3b5235
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Nov 18 13:32:48 2024 +0100

    NIFI-14020 - Add Record and Demarcator support to ConsumeGCPubSub
    
    This closes #9530.
    
    Signed-off-by: Lehel Boer <[email protected]>
---
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     | 208 +++++++++++++++--
 .../processors/gcp/pubsub/PubSubAttributes.java    |   3 +
 .../consume/AbstractPubSubMessageConverter.java    | 173 ++++++++++++++
 .../gcp/pubsub/consume/OutputStrategy.java         |  52 +++++
 .../gcp/pubsub/consume/ProcessingStrategy.java     |  52 +++++
 .../gcp/pubsub/consume/PubSubMessageConverter.java |  32 +++
 .../RecordStreamPubSubMessageConverter.java        |  44 ++++
 .../gcp/pubsub/consume/WrapperRecord.java          |  84 +++++++
 .../WrapperRecordStreamPubSubMessageConverter.java |  45 ++++
 .../processors/gcp/pubsub/ConsumeGCPubSubTest.java | 251 +++++++++++++++++++++
 10 files changed, 919 insertions(+), 25 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 8eae56ec59..bde5fc788e 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -46,9 +47,19 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
+import 
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
+import 
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -58,6 +69,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
@@ -69,19 +81,32 @@ import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
 
-@SeeAlso({PublishGCPubSub.class})
+@SeeAlso({ PublishGCPubSub.class })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
-@CapabilityDescription("Consumes message from the configured Google Cloud 
PubSub subscription. If the 'Batch Size' is set, " +
-        "the configured number of messages will be pulled in a single request, 
else only one message will be pulled.")
-@WritesAttributes({
-        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
-        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = 
SERIALIZED_SIZE_DESCRIPTION),
-        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
-        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = 
MSG_PUBLISH_TIME_DESCRIPTION),
-        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description 
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
-})
+@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
+@CapabilityDescription(
+    """
+     Consumes messages from the configured Google Cloud PubSub subscription. 
The 'Batch Size' property specified the maximum
+     number of messages that will be pulled from the subscription in a single 
request. The 'Processing Strategy' property
+     specifies if each message should be its own FlowFile or if messages 
should be grouped into a single FlowFile. Using the
+     Demarcator strategy will provide best throughput when the format allows 
it. Using Record allows to convert data format
+     as well as doing schema enforcement. Using the FlowFile strategy will 
generate one FlowFile per message and will have
+     the message's attributes as FlowFile attributes.
+    """
+)
+@WritesAttributes(
+    {
+            @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
+            @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, 
description = SERIALIZED_SIZE_DESCRIPTION),
+            @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+            @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, 
description = MSG_PUBLISH_TIME_DESCRIPTION),
+            @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE, 
description = SUBSCRIPTION_NAME_DESCRIPTION),
+            @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, 
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+    }
+)
 public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.subscriptions.consume");
@@ -95,21 +120,81 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .build();
 
+    public static final PropertyDescriptor PROCESSING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Processing Strategy")
+            .description("Strategy for processing PubSub Records and writing 
serialized output to FlowFiles")
+            .required(true)
+            .allowableValues(ProcessingStrategy.class)
+            .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
outgoing FlowFiles")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Output Strategy")
+            .description("The format used to output the Kafka Record into a 
FlowFile Record.")
+            .required(true)
+            .defaultValue(OutputStrategy.USE_VALUE)
+            .allowableValues(OutputStrategy.class)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(true)
+            .addValidator(Validator.VALID)
+            .description(
+                """
+                Since the PubSub client receives messages in batches, this 
Processor has an option to output FlowFiles
+                which contains all the messages in a single batch. This 
property allows you to provide a string
+                (interpreted as UTF-8) to use for demarcating apart multiple 
messages. To enter special character
+                such as 'new line' use CTRL+Enter or Shift+Enter depending on 
the OS.
+                """)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+            .build();
+
+    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
+            .name("parse failure")
+            .description("If configured to use a Record Reader, a PubSub 
message that cannot be parsed using the configured Record Reader will be routed 
to this relationship")
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             GCP_CREDENTIALS_PROVIDER_SERVICE,
             PROJECT_ID,
             SUBSCRIPTION,
             BATCH_SIZE_THRESHOLD,
+            PROCESSING_STRATEGY,
+            RECORD_READER,
+            RECORD_WRITER,
+            OUTPUT_STRATEGY,
+            MESSAGE_DEMARCATOR,
             API_ENDPOINT,
-            PROXY_CONFIGURATION_SERVICE
-    );
+            PROXY_CONFIGURATION_SERVICE);
 
-    public static final Set<Relationship> RELATIONSHIPS = Set.of(
-            REL_SUCCESS
-    );
+    private static final Set<Relationship> SUCCESS_RELATIONSHIP = 
Set.of(REL_SUCCESS);
+    private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private SubscriberStub subscriber = null;
+    protected SubscriberStub subscriber = null;
     private PullRequest pullRequest;
+    protected volatile OutputStrategy outputStrategy;
+    protected volatile ProcessingStrategy processingStrategy;
+    private volatile boolean useReader;
+    protected volatile String demarcatorValue;
 
     private final AtomicReference<Exception> storedException = new 
AtomicReference<>();
 
@@ -120,9 +205,17 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
 
     @Override
     public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
+        return useReader ? SUCCESS_FAILURE_RELATIONSHIPS : 
SUCCESS_RELATIONSHIP;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (descriptor.equals(RECORD_READER)) {
+            useReader = newValue != null;
+        }
     }
 
+    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         final Integer batchSize = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
@@ -138,6 +231,10 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
             storedException.set(e);
             getLogger().error("Failed to create Google Cloud Subscriber", e);
         }
+
+        outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+        processingStrategy = 
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+        demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR).getValue();
     }
 
     @Override
@@ -227,30 +324,91 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
         final PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
         final List<String> ackIds = new ArrayList<>();
         final String subscriptionName = getSubscriptionName(context, null);
+        List<ReceivedMessage> receivedMessages = 
pullResponse.getReceivedMessagesList();
 
-        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) 
{
+        switch (processingStrategy) {
+        case RECORD -> processInputRecords(context, session, receivedMessages, 
subscriptionName, ackIds);
+        case FLOW_FILE -> processInputFlowFile(session, receivedMessages, 
subscriptionName, ackIds);
+        case DEMARCATOR -> processInputDemarcator(session, receivedMessages, 
subscriptionName, ackIds);
+        }
+
+        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+    }
+
+    private void processInputDemarcator(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName,
+            final List<String> ackIds) {
+        final byte[] demarcator = demarcatorValue == null ? new byte[0] : 
demarcatorValue.getBytes(StandardCharsets.UTF_8);
+        FlowFile flowFile = session.create();
+
+        try {
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    for (ReceivedMessage message : receivedMessages) {
+                        if (message.hasMessage()) {
+                            
out.write(message.getMessage().getData().toByteArray());
+                            out.write(demarcator);
+                            ackIds.add(message.getAckId());
+                        }
+                    }
+                }
+            });
+            session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE, 
subscriptionName);
+        } catch (Exception e) {
+            ackIds.clear();
+            session.remove(flowFile);
+            throw new ProcessException("Failed to write batch of messages in 
FlowFile", e);
+        }
+
+        if (flowFile.getSize() > 0) {
+            session.putAttribute(flowFile, "record.count", 
String.valueOf(ackIds.size()));
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
+            session.adjustCounter("Records Received from " + subscriptionName, 
ackIds.size(), false);
+        } else {
+            session.remove(flowFile);
+        }
+    }
+
+    private void processInputFlowFile(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName, final 
List<String> ackIds) {
+        for (ReceivedMessage message : receivedMessages) {
             if (message.hasMessage()) {
                 FlowFile flowFile = session.create();
 
                 final Map<String, String> attributes = new HashMap<>();
-                ackIds.add(message.getAckId());
-
                 attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
                 attributes.put(SERIALIZED_SIZE_ATTRIBUTE, 
String.valueOf(message.getSerializedSize()));
                 attributes.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
                 attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
                 attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+                attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
                 attributes.putAll(message.getMessage().getAttributesMap());
 
                 flowFile = session.putAllAttributes(flowFile, attributes);
                 flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toByteArray()));
 
+                ackIds.add(message.getAckId());
                 session.transfer(flowFile, REL_SUCCESS);
                 session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
             }
         }
+    }
 
-        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+    private void processInputRecords(final ProcessContext context, final 
ProcessSession session, final List<ReceivedMessage> receivedMessages, final 
String subscriptionName,
+            final List<String> ackIds) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        PubSubMessageConverter converter;
+        if (OutputStrategy.USE_VALUE.equals(outputStrategy)) {
+            converter = new RecordStreamPubSubMessageConverter(readerFactory, 
writerFactory, getLogger());
+        } else if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) {
+            converter = new 
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory, 
getLogger());
+        } else {
+            throw new ProcessException(String.format("Output Strategy not 
supported [%s]", outputStrategy));
+        }
+
+        converter.toFlowFiles(session, receivedMessages, ackIds, 
subscriptionName);
     }
 
     private void acknowledgeAcks(final Collection<String> ackIds, final String 
subscriptionName) {
@@ -259,9 +417,9 @@ public class ConsumeGCPubSub extends 
AbstractGCPubSubProcessor {
         }
 
         AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
-            .addAllAckIds(ackIds)
-            .setSubscription(subscriptionName)
-            .build();
+                .addAllAckIds(ackIds)
+                .setSubscription(subscriptionName)
+                .build();
         subscriber.acknowledgeCallable().call(acknowledgeRequest);
     }
 
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
index b9654112d4..b2df8ded14 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -39,6 +39,9 @@ public class PubSubAttributes {
     public static final String MSG_PUBLISH_TIME_ATTRIBUTE = 
"gcp.pubsub.publishTime";
     public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value 
when the message was published";
 
+    public static final String SUBSCRIPTION_NAME_ATTRIBUTE = 
"gcp.pubsub.subscription";
+    public static final String SUBSCRIPTION_NAME_DESCRIPTION = "Name of the 
PubSub subscription";
+
     public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic 
Attributes";
     public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than 
the listed attributes, this processor may write zero or more attributes, " +
             "if the original Google Cloud Publisher client added any 
attributes to the message while sending";
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
new file mode 100644
index 0000000000..eb4f449264
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+
+public abstract class AbstractPubSubMessageConverter implements 
PubSubMessageConverter {
+
+    protected final RecordReaderFactory readerFactory;
+    protected final RecordSetWriterFactory writerFactory;
+    protected final ComponentLog logger;
+
+    protected AbstractPubSubMessageConverter(
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+    }
+
+    @Override
+    public void toFlowFiles(final ProcessSession session, final 
List<ReceivedMessage> messages, final List<String> ackIds, final String 
subscriptionName) {
+        try {
+            final Map<RecordSchema, RecordGroup> recordGroups = new 
HashMap<>();
+            final Map<String, String> attributes = new HashMap<>();
+
+            for (ReceivedMessage message : messages) {
+
+                if (message.hasMessage()) {
+                    byte[] payload = 
message.getMessage().getData().toByteArray();
+                    try (final InputStream in = new 
ByteArrayInputStream(payload);
+                            final RecordReader valueRecordReader = 
readerFactory.createRecordReader(attributes, in, payload.length, logger)) {
+
+                        Record record;
+                        while ((record = valueRecordReader.nextRecord()) != 
null) {
+
+                            final RecordSchema recordSchema = 
record.getSchema();
+                            final RecordSchema writeSchema = 
writerFactory.getSchema(attributes, getRecordSchema(recordSchema));
+
+                            // Get/Register the Record Group is associated 
with the schema for this message
+                            final RecordGroup recordGroup = 
recordGroups.computeIfAbsent(writeSchema, schema -> {
+                                final FlowFile flowFile = session.create();
+                                try {
+                                    final OutputStream out = 
session.write(flowFile);
+                                    final RecordSetWriter writer = 
writerFactory.createWriter(logger, writeSchema, out, attributes);
+                                    writer.beginRecordSet();
+                                    return new RecordGroup(flowFile, writer);
+                                } catch (Exception e) {
+                                    session.remove(flowFile);
+                                    throw new ProcessException("Failed to 
create RecordSetWriter", e);
+                                }
+                            });
+
+                            // Create the Record object and write it to the 
Record Writer.
+                            recordGroup.writer().write(getRecord(record, 
message));
+                        }
+
+                    } catch (final MalformedRecordException | IOException | 
SchemaNotFoundException e) {
+                        logger.error("Failed to parse the record. Transfer to 
a 'parse.failure' relationship", e);
+                        handleParseFailure(session, message, payload, ackIds, 
subscriptionName);
+                        continue;
+                    }
+                }
+
+                // Track the ack ID for the message
+                ackIds.add(message.getAckId());
+            }
+
+            // Finish writing the records
+            finishRecordGroups(session, recordGroups, subscriptionName);
+
+        } catch (final Exception e) {
+            throw new ProcessException("FlowFile Record conversion failed", e);
+        }
+    }
+
+    protected abstract Record getRecord(Record record, ReceivedMessage 
message);
+
+    protected abstract RecordSchema getRecordSchema(RecordSchema recordSchema);
+
+    private void handleParseFailure(final ProcessSession session, final 
ReceivedMessage message, final byte[] payload,
+            final List<String> ackIds, final String subscriptionName) {
+        // Failed to parse the record. Transfer to a 'parse.failure' 
relationship
+        FlowFile flowFile = session.create();
+        flowFile = session.putAllAttributes(flowFile, 
message.getMessage().getAttributesMap());
+        flowFile = session.putAttribute(flowFile, ACK_ID_ATTRIBUTE, 
message.getAckId());
+        flowFile = session.putAttribute(flowFile, SERIALIZED_SIZE_ATTRIBUTE, 
String.valueOf(message.getSerializedSize()));
+        flowFile = session.putAttribute(flowFile, MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
+        flowFile = session.putAttribute(flowFile, 
MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
+        flowFile = session.putAttribute(flowFile, MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+        flowFile = session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE, 
subscriptionName);
+        flowFile = session.write(flowFile, out -> out.write(payload));
+        session.transfer(flowFile, ConsumeGCPubSub.REL_PARSE_FAILURE);
+        session.adjustCounter("Records Received from " + subscriptionName, 1, 
false);
+
+        // Track the ack ID for the message
+        ackIds.add(message.getAckId());
+    }
+
+    private void finishRecordGroups(ProcessSession session, Map<RecordSchema, 
RecordGroup> recordGroups, String subscriptionName) {
+        for (final RecordGroup recordGroup : recordGroups.values()) {
+            final Map<String, String> newAttributes;
+            final int recordCount;
+            try (final RecordSetWriter writer = recordGroup.writer()) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                newAttributes = new HashMap<>(writeResult.getAttributes());
+                newAttributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                newAttributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                newAttributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, 
subscriptionName);
+                recordCount = writeResult.getRecordCount();
+            } catch (IOException e) {
+                throw new ProcessException("Failed to finish writing records", 
e);
+            }
+
+            FlowFile flowFile = recordGroup.flowFile();
+            flowFile = session.putAllAttributes(flowFile, newAttributes);
+            final ProvenanceReporter provenanceReporter = 
session.getProvenanceReporter();
+            provenanceReporter.receive(flowFile, subscriptionName);
+            session.transfer(flowFile, ConsumeGCPubSub.REL_SUCCESS);
+            session.adjustCounter("Records Received from " + subscriptionName, 
recordCount, false);
+        }
+    }
+
+    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer) {
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
new file mode 100644
index 0000000000..aab5ca7238
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported PubSub Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+    USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the message 
payload to the FlowFile record."),
+    USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the message's attributes 
into the FlowFile record.");
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    OutputStrategy(final String value, final String displayName, final String 
description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
new file mode 100644
index 0000000000..d1ebc3f0c4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supporting strategies for consuming and serializing PubSub
+ * messages
+ */
+public enum ProcessingStrategy implements DescribedValue {
+    FLOW_FILE("Write one FlowFile for each PubSub message consumed"),
+
+    DEMARCATOR("Write one FlowFile for each batch of PubSub messages 
consumed"),
+
+    RECORD("Write one FlowFile containing multiple PubSub messages consumed 
and processed with Record Reader and Record Writer");
+
+    private final String description;
+
+    ProcessingStrategy(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
new file mode 100644
index 0000000000..33c6dc5fce
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Implementations of {@link PubSubMessageConverter} employ specialized
+ * strategies for converting PubSub messages into NiFi FlowFiles.
+ */
+public interface PubSubMessageConverter {
+
+    void toFlowFiles(final ProcessSession session, final List<ReceivedMessage> 
messages, final List<String> ackIds, final String subscriptionName);
+
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
new file mode 100644
index 0000000000..75eb2e83be
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class RecordStreamPubSubMessageConverter extends 
AbstractPubSubMessageConverter implements PubSubMessageConverter {
+
+    public RecordStreamPubSubMessageConverter(
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final ComponentLog logger) {
+        super(readerFactory, writerFactory, logger);
+    }
+
+    @Override
+    protected Record getRecord(Record record, ReceivedMessage message) {
+        return record;
+    }
+
+    @Override
+    protected RecordSchema getRecordSchema(RecordSchema recordSchema) {
+        return recordSchema;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
new file mode 100644
index 0000000000..50ad8eb610
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+
+public class WrapperRecord extends MapRecord {
+
+    public static final String METADATA = "metadata";
+    public static final String ATTRIBUTES = "attributes";
+    public static final String VALUE = "value";
+
+    private static final RecordField FIELD_ACK_ID_ATTRIBUTE = new 
RecordField(ACK_ID_ATTRIBUTE, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SERIALIZED_SIZE_ATTRIBUTE = new 
RecordField(SERIALIZED_SIZE_ATTRIBUTE, RecordFieldType.INT.getDataType());
+    private static final RecordField FIELD_MESSAGE_ID_ATTRIBUTE = new 
RecordField(MESSAGE_ID_ATTRIBUTE, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_MSG_ATTRIBUTES_COUNT_ATTRIBUTE = 
new RecordField(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
RecordFieldType.INT.getDataType());
+    private static final RecordField FIELD_MSG_PUBLISH_TIME_ATTRIBUTE = new 
RecordField(MSG_PUBLISH_TIME_ATTRIBUTE, RecordFieldType.LONG.getDataType());
+    public static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(
+            Arrays.asList(FIELD_ACK_ID_ATTRIBUTE, 
FIELD_SERIALIZED_SIZE_ATTRIBUTE, FIELD_MESSAGE_ID_ATTRIBUTE, 
FIELD_MSG_ATTRIBUTES_COUNT_ATTRIBUTE, FIELD_MSG_PUBLISH_TIME_ATTRIBUTE));
+
+    public static final RecordField FIELD_METADATA = new RecordField(METADATA, 
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+    public static final RecordField FIELD_ATTRIBUTES = new 
RecordField(ATTRIBUTES, 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+
+    private static RecordSchema toRecordSchema(final Record record) {
+        final RecordField fieldValue = new RecordField(VALUE, 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, 
FIELD_ATTRIBUTES, fieldValue));
+    }
+
+    private static Map<String, Object> toValues(final Record record, 
ReceivedMessage message) {
+        final Map<String, Object> valuesMetadata = new HashMap<>();
+        valuesMetadata.put(ACK_ID_ATTRIBUTE, message.getAckId());
+        valuesMetadata.put(SERIALIZED_SIZE_ATTRIBUTE, 
message.getSerializedSize());
+        valuesMetadata.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
+        valuesMetadata.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
message.getMessage().getAttributesCount());
+        valuesMetadata.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
message.getMessage().getPublishTime().getSeconds());
+        final Record recordMetadata = new MapRecord(SCHEMA_METADATA, 
valuesMetadata);
+
+        final Map<String, Object> valuesWrapper = new HashMap<>();
+        valuesWrapper.put(METADATA, recordMetadata);
+        valuesWrapper.put(ATTRIBUTES, message.getMessage().getAttributesMap());
+        valuesWrapper.put(VALUE, record);
+        return valuesWrapper;
+    }
+
+    public WrapperRecord(final Record record, ReceivedMessage message) {
+        super(toRecordSchema(record), toValues(record, message));
+    }
+
+    public static RecordSchema toWrapperSchema(final RecordSchema 
recordSchema) {
+        final RecordField fieldValue = new RecordField(VALUE, 
RecordFieldType.RECORD.getRecordDataType(recordSchema));
+        return new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, 
FIELD_ATTRIBUTES, fieldValue));
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
new file mode 100644
index 0000000000..9e371d0949
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class WrapperRecordStreamPubSubMessageConverter extends 
AbstractPubSubMessageConverter implements PubSubMessageConverter {
+
+    public WrapperRecordStreamPubSubMessageConverter(
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final ComponentLog logger) {
+        super(readerFactory, writerFactory, logger);
+    }
+
+    @Override
+    protected Record getRecord(Record record, ReceivedMessage message) {
+        return new WrapperRecord(record, message);
+    }
+
+    @Override
+    protected RecordSchema getRecordSchema(RecordSchema recordSchema) {
+        return WrapperRecord.toWrapperSchema(recordSchema);
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
new file mode 100644
index 0000000000..59132c0f4e
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumeGCPubSubTest {
+
+    private static final String SUBSCRIPTION = "my-subscription";
+    private static final String PROJECT = "my-project";
+    private static final String SUBSCRIPTION_FULL = "projects/" + PROJECT + 
"/subscriptions/" + SUBSCRIPTION;
+
+    private SubscriberStub subscriberMock;
+    private TestRunner runner;
+    private List<ReceivedMessage> messages = new ArrayList<>();
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    void setRunner() throws InitializationException {
+        subscriberMock = mock(SubscriberStub.class);
+
+        UnaryCallable<PullRequest, PullResponse> callable = 
mock(UnaryCallable.class);
+        PullResponse response = mock(PullResponse.class);
+
+        when(subscriberMock.pullCallable()).thenReturn(callable);
+        when(callable.call(any())).thenReturn(response);
+        when(response.getReceivedMessagesList()).thenReturn(messages);
+
+        UnaryCallable<AcknowledgeRequest, Empty> ackCallable = 
mock(UnaryCallable.class);
+        when(subscriberMock.acknowledgeCallable()).thenReturn(ackCallable);
+        when(ackCallable.call(any())).thenReturn(Empty.getDefaultInstance());
+
+        runner = TestRunners.newTestRunner(new ConsumeGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                subscriber = subscriberMock;
+
+                outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+                processingStrategy = 
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+                demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
+            }
+        });
+
+        runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, SUBSCRIPTION);
+
+        messages.clear();
+    }
+
+    @Test
+    void testFlowFileStrategy() throws InitializationException {
+        messages.add(createMessage("test1"));
+        messages.add(createMessage("test2"));
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+        flowFile.assertContentEquals("test1");
+        flowFile.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFile.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE, 
SUBSCRIPTION_FULL);
+        flowFile.assertAttributeEquals("attKey", "attValue");
+    }
+
+    @Test
+    void testDemarcatorStrategy() throws InitializationException {
+        runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY, 
ProcessingStrategy.DEMARCATOR);
+        runner.setProperty(ConsumeGCPubSub.MESSAGE_DEMARCATOR, "\n");
+
+        messages.add(createMessage("test1"));
+        messages.add(createMessage("test2"));
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+        flowFile.assertContentEquals("test1\ntest2\n");
+        
flowFile.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFile.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE, 
SUBSCRIPTION_FULL);
+    }
+
+    @Test
+    void testRecordStrategyNoWrapper() throws InitializationException {
+        runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY, 
ProcessingStrategy.RECORD);
+
+        final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+        runner.addControllerService("json-writer", writer);
+        runner.enableControllerService(writer);
+        runner.setProperty(ConsumeGCPubSub.RECORD_WRITER, "json-writer");
+
+        final JsonTreeReader reader = new JsonTreeReader();
+        runner.addControllerService("json-reader", reader);
+        runner.enableControllerService(reader);
+        runner.setProperty(ConsumeGCPubSub.RECORD_READER, "json-reader");
+
+        messages.add(createMessage("{\"foo\":\"foo1\"}"));
+        messages.add(createMessage("test2"));
+        messages.add(createMessage("{\"foo\":\"foo2\"}"));
+        messages.add(createMessage("test3"));
+        runner.run(1);
+
+        runner.assertTransferCount(ConsumeGCPubSub.REL_SUCCESS, 1);
+        runner.assertTransferCount(ConsumeGCPubSub.REL_PARSE_FAILURE, 2);
+
+        final MockFlowFile flowFileSuccess = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+        
flowFileSuccess.assertContentEquals("[{\"foo\":\"foo1\"},{\"foo\":\"foo2\"}]");
+        
flowFileSuccess.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFileSuccess.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
 SUBSCRIPTION_FULL);
+
+        final MockFlowFile flowFileParseFailure = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_PARSE_FAILURE).iterator().next();
+        flowFileParseFailure.assertContentEquals("test2");
+        
flowFileParseFailure.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFileParseFailure.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
 SUBSCRIPTION_FULL);
+        flowFileParseFailure.assertAttributeEquals("attKey", "attValue");
+    }
+
+    @Test
+    void testRecordStrategyWithWrapper() throws InitializationException, 
JsonMappingException, JsonProcessingException {
+        runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY, 
ProcessingStrategy.RECORD);
+        runner.setProperty(ConsumeGCPubSub.OUTPUT_STRATEGY, 
OutputStrategy.USE_WRAPPER);
+
+        final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+        runner.addControllerService("json-writer", writer);
+        runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, 
"true");
+        runner.enableControllerService(writer);
+        runner.setProperty(ConsumeGCPubSub.RECORD_WRITER, "json-writer");
+
+        final JsonTreeReader reader = new JsonTreeReader();
+        runner.addControllerService("json-reader", reader);
+        runner.enableControllerService(reader);
+        runner.setProperty(ConsumeGCPubSub.RECORD_READER, "json-reader");
+
+        messages.add(createMessage("{\"foo\":\"foo1\"}"));
+        messages.add(createMessage("test2"));
+        messages.add(createMessage("{\"foo\":\"foo2\"}"));
+        messages.add(createMessage("test3"));
+        runner.run(1);
+
+        runner.assertTransferCount(ConsumeGCPubSub.REL_SUCCESS, 1);
+        runner.assertTransferCount(ConsumeGCPubSub.REL_PARSE_FAILURE, 2);
+
+        final String expected = """
+                [ {
+                  "metadata" : {
+                    "gcp.pubsub.ackId" : "ackId",
+                    "gcp.pubsub.messageSize" : 56,
+                    "gcp.pubsub.messageId" : "messageId",
+                    "gcp.pubsub.attributesCount" : 1,
+                    "gcp.pubsub.publishTime" : 0
+                  },
+                  "attributes" : {
+                    "attKey" : "attValue"
+                  },
+                  "value" : {
+                    "foo" : "foo1"
+                  }
+                }, {
+                  "metadata" : {
+                    "gcp.pubsub.ackId" : "ackId",
+                    "gcp.pubsub.messageSize" : 56,
+                    "gcp.pubsub.messageId" : "messageId",
+                    "gcp.pubsub.attributesCount" : 1,
+                    "gcp.pubsub.publishTime" : 0
+                  },
+                  "attributes" : {
+                    "attKey" : "attValue"
+                  },
+                  "value" : {
+                    "foo" : "foo2"
+                  }
+                } ]""";
+
+        final MockFlowFile flowFileSuccess = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+        final String content = flowFileSuccess.getContent();
+        assertEquals(mapper.readTree(content), mapper.readTree(expected));
+        
flowFileSuccess.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFileSuccess.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
 SUBSCRIPTION_FULL);
+
+        final MockFlowFile flowFileParseFailure = 
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_PARSE_FAILURE).iterator().next();
+        flowFileParseFailure.assertContentEquals("test2");
+        
flowFileParseFailure.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+        
flowFileParseFailure.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
 SUBSCRIPTION_FULL);
+        flowFileParseFailure.assertAttributeEquals("attKey", "attValue");
+    }
+
+    private ReceivedMessage createMessage(String content) {
+        final byte[] data = content.getBytes();
+        return ReceivedMessage.newBuilder()
+                .setMessage(PubsubMessage.newBuilder()
+                        .setData(ByteString.copyFrom(data))
+                        .putAttributes("attKey", "attValue")
+                        .setMessageId("messageId")
+                        .build())
+                .setAckId("ackId")
+                .build();
+    }
+
+    private static String getCredentialsServiceId(final TestRunner runner) 
throws InitializationException {
+        final ControllerService controllerService = 
mock(GCPCredentialsControllerService.class);
+        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+        
when(controllerService.getIdentifier()).thenReturn(controllerServiceId);
+        runner.addControllerService(controllerServiceId, controllerService);
+        runner.enableControllerService(controllerService);
+        return controllerServiceId;
+    }
+}

Reply via email to