This is an automated email from the ASF dual-hosted git repository.
lehelb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fb17c10b2f NIFI-14020 - Add Record and Demarcator support to
ConsumeGCPubSub
fb17c10b2f is described below
commit fb17c10b2fc2ebf78f96f7e9b105f2143c3b5235
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Nov 18 13:32:48 2024 +0100
NIFI-14020 - Add Record and Demarcator support to ConsumeGCPubSub
This closes #9530.
Signed-off-by: Lehel Boer <[email protected]>
---
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 208 +++++++++++++++--
.../processors/gcp/pubsub/PubSubAttributes.java | 3 +
.../consume/AbstractPubSubMessageConverter.java | 173 ++++++++++++++
.../gcp/pubsub/consume/OutputStrategy.java | 52 +++++
.../gcp/pubsub/consume/ProcessingStrategy.java | 52 +++++
.../gcp/pubsub/consume/PubSubMessageConverter.java | 32 +++
.../RecordStreamPubSubMessageConverter.java | 44 ++++
.../gcp/pubsub/consume/WrapperRecord.java | 84 +++++++
.../WrapperRecordStreamPubSubMessageConverter.java | 45 ++++
.../processors/gcp/pubsub/ConsumeGCPubSubTest.java | 251 +++++++++++++++++++++
10 files changed, 919 insertions(+), 25 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 8eae56ec59..bde5fc788e 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -46,9 +47,19 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
+import
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
+import
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -58,6 +69,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
@@ -69,19 +81,32 @@ import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
-@SeeAlso({PublishGCPubSub.class})
+@SeeAlso({ PublishGCPubSub.class })
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
-@CapabilityDescription("Consumes message from the configured Google Cloud
PubSub subscription. If the 'Batch Size' is set, " +
- "the configured number of messages will be pulled in a single request,
else only one message will be pulled.")
-@WritesAttributes({
- @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description =
ACK_ID_DESCRIPTION),
- @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description =
SERIALIZED_SIZE_DESCRIPTION),
- @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
- @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description =
MSG_PUBLISH_TIME_DESCRIPTION),
- @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
-})
+@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
+@CapabilityDescription(
+ """
+ Consumes messages from the configured Google Cloud PubSub subscription.
The 'Batch Size' property specified the maximum
+ number of messages that will be pulled from the subscription in a single
request. The 'Processing Strategy' property
+ specifies if each message should be its own FlowFile or if messages
should be grouped into a single FlowFile. Using the
+ Demarcator strategy will provide best throughput when the format allows
it. Using Record allows to convert data format
+ as well as doing schema enforcement. Using the FlowFile strategy will
generate one FlowFile per message and will have
+ the message's attributes as FlowFile attributes.
+ """
+)
+@WritesAttributes(
+ {
+ @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description =
ACK_ID_DESCRIPTION),
+ @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE,
description = SERIALIZED_SIZE_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE,
description = MSG_PUBLISH_TIME_DESCRIPTION),
+ @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE,
description = SUBSCRIPTION_NAME_DESCRIPTION),
+ @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE,
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+ }
+)
public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.subscriptions.consume");
@@ -95,21 +120,81 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ public static final PropertyDescriptor PROCESSING_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Processing Strategy")
+ .description("Strategy for processing PubSub Records and writing
serialized output to FlowFiles")
+ .required(true)
+ .allowableValues(ProcessingStrategy.class)
+ .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
+ .expressionLanguageSupported(NONE)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("The Record Reader to use for incoming messages")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .description("The Record Writer to use in order to serialize the
outgoing FlowFiles")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Output Strategy")
+ .description("The format used to output the Kafka Record into a
FlowFile Record.")
+ .required(true)
+ .defaultValue(OutputStrategy.USE_VALUE)
+ .allowableValues(OutputStrategy.class)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_DEMARCATOR = new
PropertyDescriptor.Builder()
+ .name("Message Demarcator")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .description(
+ """
+ Since the PubSub client receives messages in batches, this
Processor has an option to output FlowFiles
+ which contains all the messages in a single batch. This
property allows you to provide a string
+ (interpreted as UTF-8) to use for demarcating apart multiple
messages. To enter special character
+ such as 'new line' use CTRL+Enter or Shift+Enter depending on
the OS.
+ """)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+ .build();
+
+ public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
+ .name("parse failure")
+ .description("If configured to use a Record Reader, a PubSub
message that cannot be parsed using the configured Record Reader will be routed
to this relationship")
+ .build();
+
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
SUBSCRIPTION,
BATCH_SIZE_THRESHOLD,
+ PROCESSING_STRATEGY,
+ RECORD_READER,
+ RECORD_WRITER,
+ OUTPUT_STRATEGY,
+ MESSAGE_DEMARCATOR,
API_ENDPOINT,
- PROXY_CONFIGURATION_SERVICE
- );
+ PROXY_CONFIGURATION_SERVICE);
- public static final Set<Relationship> RELATIONSHIPS = Set.of(
- REL_SUCCESS
- );
+ private static final Set<Relationship> SUCCESS_RELATIONSHIP =
Set.of(REL_SUCCESS);
+ private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
- private SubscriberStub subscriber = null;
+ protected SubscriberStub subscriber = null;
private PullRequest pullRequest;
+ protected volatile OutputStrategy outputStrategy;
+ protected volatile ProcessingStrategy processingStrategy;
+ private volatile boolean useReader;
+ protected volatile String demarcatorValue;
private final AtomicReference<Exception> storedException = new
AtomicReference<>();
@@ -120,9 +205,17 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubProcessor {
@Override
public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
+ return useReader ? SUCCESS_FAILURE_RELATIONSHIPS :
SUCCESS_RELATIONSHIP;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (descriptor.equals(RECORD_READER)) {
+ useReader = newValue != null;
+ }
}
+ @Override
@OnScheduled
public void onScheduled(ProcessContext context) {
final Integer batchSize =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
@@ -138,6 +231,10 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubProcessor {
storedException.set(e);
getLogger().error("Failed to create Google Cloud Subscriber", e);
}
+
+ outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+ processingStrategy =
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+ demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR).getValue();
}
@Override
@@ -227,30 +324,91 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubProcessor {
final PullResponse pullResponse =
subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>();
final String subscriptionName = getSubscriptionName(context, null);
+ List<ReceivedMessage> receivedMessages =
pullResponse.getReceivedMessagesList();
- for (ReceivedMessage message : pullResponse.getReceivedMessagesList())
{
+ switch (processingStrategy) {
+ case RECORD -> processInputRecords(context, session, receivedMessages,
subscriptionName, ackIds);
+ case FLOW_FILE -> processInputFlowFile(session, receivedMessages,
subscriptionName, ackIds);
+ case DEMARCATOR -> processInputDemarcator(session, receivedMessages,
subscriptionName, ackIds);
+ }
+
+ session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+ }
+
+ private void processInputDemarcator(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName,
+ final List<String> ackIds) {
+ final byte[] demarcator = demarcatorValue == null ? new byte[0] :
demarcatorValue.getBytes(StandardCharsets.UTF_8);
+ FlowFile flowFile = session.create();
+
+ try {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ for (ReceivedMessage message : receivedMessages) {
+ if (message.hasMessage()) {
+
out.write(message.getMessage().getData().toByteArray());
+ out.write(demarcator);
+ ackIds.add(message.getAckId());
+ }
+ }
+ }
+ });
+ session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE,
subscriptionName);
+ } catch (Exception e) {
+ ackIds.clear();
+ session.remove(flowFile);
+ throw new ProcessException("Failed to write batch of messages in
FlowFile", e);
+ }
+
+ if (flowFile.getSize() > 0) {
+ session.putAttribute(flowFile, "record.count",
String.valueOf(ackIds.size()));
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile,
subscriptionName);
+ session.adjustCounter("Records Received from " + subscriptionName,
ackIds.size(), false);
+ } else {
+ session.remove(flowFile);
+ }
+ }
+
+ private void processInputFlowFile(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName, final
List<String> ackIds) {
+ for (ReceivedMessage message : receivedMessages) {
if (message.hasMessage()) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
- ackIds.add(message.getAckId());
-
attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
attributes.put(SERIALIZED_SIZE_ATTRIBUTE,
String.valueOf(message.getSerializedSize()));
attributes.put(MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
String.valueOf(message.getMessage().getAttributesCount()));
attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE,
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+ attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
attributes.putAll(message.getMessage().getAttributesMap());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out ->
out.write(message.getMessage().getData().toByteArray()));
+ ackIds.add(message.getAckId());
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile,
subscriptionName);
}
}
+ }
- session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+ private void processInputRecords(final ProcessContext context, final
ProcessSession session, final List<ReceivedMessage> receivedMessages, final
String subscriptionName,
+ final List<String> ackIds) {
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ PubSubMessageConverter converter;
+ if (OutputStrategy.USE_VALUE.equals(outputStrategy)) {
+ converter = new RecordStreamPubSubMessageConverter(readerFactory,
writerFactory, getLogger());
+ } else if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) {
+ converter = new
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory,
getLogger());
+ } else {
+ throw new ProcessException(String.format("Output Strategy not
supported [%s]", outputStrategy));
+ }
+
+ converter.toFlowFiles(session, receivedMessages, ackIds,
subscriptionName);
}
private void acknowledgeAcks(final Collection<String> ackIds, final String
subscriptionName) {
@@ -259,9 +417,9 @@ public class ConsumeGCPubSub extends
AbstractGCPubSubProcessor {
}
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
- .addAllAckIds(ackIds)
- .setSubscription(subscriptionName)
- .build();
+ .addAllAckIds(ackIds)
+ .setSubscription(subscriptionName)
+ .build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
index b9654112d4..b2df8ded14 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -39,6 +39,9 @@ public class PubSubAttributes {
public static final String MSG_PUBLISH_TIME_ATTRIBUTE =
"gcp.pubsub.publishTime";
public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value
when the message was published";
+ public static final String SUBSCRIPTION_NAME_ATTRIBUTE =
"gcp.pubsub.subscription";
+ public static final String SUBSCRIPTION_NAME_DESCRIPTION = "Name of the
PubSub subscription";
+
public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic
Attributes";
public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than
the listed attributes, this processor may write zero or more attributes, " +
"if the original Google Cloud Publisher client added any
attributes to the message while sending";
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
new file mode 100644
index 0000000000..eb4f449264
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+
+public abstract class AbstractPubSubMessageConverter implements
PubSubMessageConverter {
+
+ protected final RecordReaderFactory readerFactory;
+ protected final RecordSetWriterFactory writerFactory;
+ protected final ComponentLog logger;
+
+ protected AbstractPubSubMessageConverter(
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final ComponentLog logger) {
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ this.logger = logger;
+ }
+
+ @Override
+ public void toFlowFiles(final ProcessSession session, final
List<ReceivedMessage> messages, final List<String> ackIds, final String
subscriptionName) {
+ try {
+ final Map<RecordSchema, RecordGroup> recordGroups = new
HashMap<>();
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (ReceivedMessage message : messages) {
+
+ if (message.hasMessage()) {
+ byte[] payload =
message.getMessage().getData().toByteArray();
+ try (final InputStream in = new
ByteArrayInputStream(payload);
+ final RecordReader valueRecordReader =
readerFactory.createRecordReader(attributes, in, payload.length, logger)) {
+
+ Record record;
+ while ((record = valueRecordReader.nextRecord()) !=
null) {
+
+ final RecordSchema recordSchema =
record.getSchema();
+ final RecordSchema writeSchema =
writerFactory.getSchema(attributes, getRecordSchema(recordSchema));
+
+ // Get/Register the Record Group is associated
with the schema for this message
+ final RecordGroup recordGroup =
recordGroups.computeIfAbsent(writeSchema, schema -> {
+ final FlowFile flowFile = session.create();
+ try {
+ final OutputStream out =
session.write(flowFile);
+ final RecordSetWriter writer =
writerFactory.createWriter(logger, writeSchema, out, attributes);
+ writer.beginRecordSet();
+ return new RecordGroup(flowFile, writer);
+ } catch (Exception e) {
+ session.remove(flowFile);
+ throw new ProcessException("Failed to
create RecordSetWriter", e);
+ }
+ });
+
+ // Create the Record object and write it to the
Record Writer.
+ recordGroup.writer().write(getRecord(record,
message));
+ }
+
+ } catch (final MalformedRecordException | IOException |
SchemaNotFoundException e) {
+ logger.error("Failed to parse the record. Transfer to
a 'parse.failure' relationship", e);
+ handleParseFailure(session, message, payload, ackIds,
subscriptionName);
+ continue;
+ }
+ }
+
+ // Track the ack ID for the message
+ ackIds.add(message.getAckId());
+ }
+
+ // Finish writing the records
+ finishRecordGroups(session, recordGroups, subscriptionName);
+
+ } catch (final Exception e) {
+ throw new ProcessException("FlowFile Record conversion failed", e);
+ }
+ }
+
+ protected abstract Record getRecord(Record record, ReceivedMessage
message);
+
+ protected abstract RecordSchema getRecordSchema(RecordSchema recordSchema);
+
+ private void handleParseFailure(final ProcessSession session, final
ReceivedMessage message, final byte[] payload,
+ final List<String> ackIds, final String subscriptionName) {
+ // Failed to parse the record. Transfer to a 'parse.failure'
relationship
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile,
message.getMessage().getAttributesMap());
+ flowFile = session.putAttribute(flowFile, ACK_ID_ATTRIBUTE,
message.getAckId());
+ flowFile = session.putAttribute(flowFile, SERIALIZED_SIZE_ATTRIBUTE,
String.valueOf(message.getSerializedSize()));
+ flowFile = session.putAttribute(flowFile, MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
+ flowFile = session.putAttribute(flowFile,
MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
String.valueOf(message.getMessage().getAttributesCount()));
+ flowFile = session.putAttribute(flowFile, MSG_PUBLISH_TIME_ATTRIBUTE,
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+ flowFile = session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE,
subscriptionName);
+ flowFile = session.write(flowFile, out -> out.write(payload));
+ session.transfer(flowFile, ConsumeGCPubSub.REL_PARSE_FAILURE);
+ session.adjustCounter("Records Received from " + subscriptionName, 1,
false);
+
+ // Track the ack ID for the message
+ ackIds.add(message.getAckId());
+ }
+
+ private void finishRecordGroups(ProcessSession session, Map<RecordSchema,
RecordGroup> recordGroups, String subscriptionName) {
+ for (final RecordGroup recordGroup : recordGroups.values()) {
+ final Map<String, String> newAttributes;
+ final int recordCount;
+ try (final RecordSetWriter writer = recordGroup.writer()) {
+ final WriteResult writeResult = writer.finishRecordSet();
+ newAttributes = new HashMap<>(writeResult.getAttributes());
+ newAttributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ newAttributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ newAttributes.put(SUBSCRIPTION_NAME_ATTRIBUTE,
subscriptionName);
+ recordCount = writeResult.getRecordCount();
+ } catch (IOException e) {
+ throw new ProcessException("Failed to finish writing records",
e);
+ }
+
+ FlowFile flowFile = recordGroup.flowFile();
+ flowFile = session.putAllAttributes(flowFile, newAttributes);
+ final ProvenanceReporter provenanceReporter =
session.getProvenanceReporter();
+ provenanceReporter.receive(flowFile, subscriptionName);
+ session.transfer(flowFile, ConsumeGCPubSub.REL_SUCCESS);
+ session.adjustCounter("Records Received from " + subscriptionName,
recordCount, false);
+ }
+ }
+
+ private record RecordGroup(FlowFile flowFile, RecordSetWriter writer) {
+ }
+
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
new file mode 100644
index 0000000000..aab5ca7238
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/OutputStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported PubSub Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+ USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the message
payload to the FlowFile record."),
+ USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the message's attributes
into the FlowFile record.");
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ OutputStrategy(final String value, final String displayName, final String
description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
new file mode 100644
index 0000000000..d1ebc3f0c4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/ProcessingStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supporting strategies for consuming and serializing PubSub
+ * messages
+ */
+public enum ProcessingStrategy implements DescribedValue {
+ FLOW_FILE("Write one FlowFile for each PubSub message consumed"),
+
+ DEMARCATOR("Write one FlowFile for each batch of PubSub messages
consumed"),
+
+ RECORD("Write one FlowFile containing multiple PubSub messages consumed
and processed with Record Reader and Record Writer");
+
+ private final String description;
+
+ ProcessingStrategy(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
new file mode 100644
index 0000000000..33c6dc5fce
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/PubSubMessageConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Implementations of {@link PubSubMessageConverter} employ specialized
+ * strategies for converting PubSub messages into NiFi FlowFiles.
+ */
+public interface PubSubMessageConverter {
+
+ void toFlowFiles(final ProcessSession session, final List<ReceivedMessage>
messages, final List<String> ackIds, final String subscriptionName);
+
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
new file mode 100644
index 0000000000..75eb2e83be
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/RecordStreamPubSubMessageConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class RecordStreamPubSubMessageConverter extends
AbstractPubSubMessageConverter implements PubSubMessageConverter {
+
+ public RecordStreamPubSubMessageConverter(
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final ComponentLog logger) {
+ super(readerFactory, writerFactory, logger);
+ }
+
+ @Override
+ protected Record getRecord(Record record, ReceivedMessage message) {
+ return record;
+ }
+
+ @Override
+ protected RecordSchema getRecordSchema(RecordSchema recordSchema) {
+ return recordSchema;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
new file mode 100644
index 0000000000..50ad8eb610
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecord.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+
+public class WrapperRecord extends MapRecord {
+
+ public static final String METADATA = "metadata";
+ public static final String ATTRIBUTES = "attributes";
+ public static final String VALUE = "value";
+
+ private static final RecordField FIELD_ACK_ID_ATTRIBUTE = new
RecordField(ACK_ID_ATTRIBUTE, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SERIALIZED_SIZE_ATTRIBUTE = new
RecordField(SERIALIZED_SIZE_ATTRIBUTE, RecordFieldType.INT.getDataType());
+ private static final RecordField FIELD_MESSAGE_ID_ATTRIBUTE = new
RecordField(MESSAGE_ID_ATTRIBUTE, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_MSG_ATTRIBUTES_COUNT_ATTRIBUTE =
new RecordField(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
RecordFieldType.INT.getDataType());
+ private static final RecordField FIELD_MSG_PUBLISH_TIME_ATTRIBUTE = new
RecordField(MSG_PUBLISH_TIME_ATTRIBUTE, RecordFieldType.LONG.getDataType());
+ public static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(
+ Arrays.asList(FIELD_ACK_ID_ATTRIBUTE,
FIELD_SERIALIZED_SIZE_ATTRIBUTE, FIELD_MESSAGE_ID_ATTRIBUTE,
FIELD_MSG_ATTRIBUTES_COUNT_ATTRIBUTE, FIELD_MSG_PUBLISH_TIME_ATTRIBUTE));
+
+ public static final RecordField FIELD_METADATA = new RecordField(METADATA,
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+ public static final RecordField FIELD_ATTRIBUTES = new
RecordField(ATTRIBUTES,
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+
+ private static RecordSchema toRecordSchema(final Record record) {
+ final RecordField fieldValue = new RecordField(VALUE,
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ return new SimpleRecordSchema(Arrays.asList(FIELD_METADATA,
FIELD_ATTRIBUTES, fieldValue));
+ }
+
+ private static Map<String, Object> toValues(final Record record,
ReceivedMessage message) {
+ final Map<String, Object> valuesMetadata = new HashMap<>();
+ valuesMetadata.put(ACK_ID_ATTRIBUTE, message.getAckId());
+ valuesMetadata.put(SERIALIZED_SIZE_ATTRIBUTE,
message.getSerializedSize());
+ valuesMetadata.put(MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
+ valuesMetadata.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
message.getMessage().getAttributesCount());
+ valuesMetadata.put(MSG_PUBLISH_TIME_ATTRIBUTE,
message.getMessage().getPublishTime().getSeconds());
+ final Record recordMetadata = new MapRecord(SCHEMA_METADATA,
valuesMetadata);
+
+ final Map<String, Object> valuesWrapper = new HashMap<>();
+ valuesWrapper.put(METADATA, recordMetadata);
+ valuesWrapper.put(ATTRIBUTES, message.getMessage().getAttributesMap());
+ valuesWrapper.put(VALUE, record);
+ return valuesWrapper;
+ }
+
+ public WrapperRecord(final Record record, ReceivedMessage message) {
+ super(toRecordSchema(record), toValues(record, message));
+ }
+
+ public static RecordSchema toWrapperSchema(final RecordSchema
recordSchema) {
+ final RecordField fieldValue = new RecordField(VALUE,
RecordFieldType.RECORD.getRecordDataType(recordSchema));
+ return new SimpleRecordSchema(Arrays.asList(FIELD_METADATA,
FIELD_ATTRIBUTES, fieldValue));
+ }
+
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
new file mode 100644
index 0000000000..9e371d0949
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/consume/WrapperRecordStreamPubSubMessageConverter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.consume;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class WrapperRecordStreamPubSubMessageConverter extends
AbstractPubSubMessageConverter implements PubSubMessageConverter {
+
+ public WrapperRecordStreamPubSubMessageConverter(
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final ComponentLog logger) {
+ super(readerFactory, writerFactory, logger);
+ }
+
+ @Override
+ protected Record getRecord(Record record, ReceivedMessage message) {
+ return new WrapperRecord(record, message);
+ }
+
+ @Override
+ protected RecordSchema getRecordSchema(RecordSchema recordSchema) {
+ return WrapperRecord.toWrapperSchema(recordSchema);
+ }
+
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
new file mode 100644
index 0000000000..59132c0f4e
--- /dev/null
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumeGCPubSubTest {
+
+ private static final String SUBSCRIPTION = "my-subscription";
+ private static final String PROJECT = "my-project";
+ private static final String SUBSCRIPTION_FULL = "projects/" + PROJECT +
"/subscriptions/" + SUBSCRIPTION;
+
+ private SubscriberStub subscriberMock;
+ private TestRunner runner;
+ private List<ReceivedMessage> messages = new ArrayList<>();
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @BeforeEach
+ void setRunner() throws InitializationException {
+ subscriberMock = mock(SubscriberStub.class);
+
+ UnaryCallable<PullRequest, PullResponse> callable =
mock(UnaryCallable.class);
+ PullResponse response = mock(PullResponse.class);
+
+ when(subscriberMock.pullCallable()).thenReturn(callable);
+ when(callable.call(any())).thenReturn(response);
+ when(response.getReceivedMessagesList()).thenReturn(messages);
+
+ UnaryCallable<AcknowledgeRequest, Empty> ackCallable =
mock(UnaryCallable.class);
+ when(subscriberMock.acknowledgeCallable()).thenReturn(ackCallable);
+ when(ackCallable.call(any())).thenReturn(Empty.getDefaultInstance());
+
+ runner = TestRunners.newTestRunner(new ConsumeGCPubSub() {
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ subscriber = subscriberMock;
+
+ outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+ processingStrategy =
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+ demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
+ }
+ });
+
+ runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, SUBSCRIPTION);
+
+ messages.clear();
+ }
+
+ @Test
+ void testFlowFileStrategy() throws InitializationException {
+ messages.add(createMessage("test1"));
+ messages.add(createMessage("test2"));
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+ flowFile.assertContentEquals("test1");
+ flowFile.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFile.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+ flowFile.assertAttributeEquals("attKey", "attValue");
+ }
+
+ @Test
+ void testDemarcatorStrategy() throws InitializationException {
+ runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY,
ProcessingStrategy.DEMARCATOR);
+ runner.setProperty(ConsumeGCPubSub.MESSAGE_DEMARCATOR, "\n");
+
+ messages.add(createMessage("test1"));
+ messages.add(createMessage("test2"));
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+ flowFile.assertContentEquals("test1\ntest2\n");
+
flowFile.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFile.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+ }
+
+ @Test
+ void testRecordStrategyNoWrapper() throws InitializationException {
+ runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY,
ProcessingStrategy.RECORD);
+
+ final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+ runner.addControllerService("json-writer", writer);
+ runner.enableControllerService(writer);
+ runner.setProperty(ConsumeGCPubSub.RECORD_WRITER, "json-writer");
+
+ final JsonTreeReader reader = new JsonTreeReader();
+ runner.addControllerService("json-reader", reader);
+ runner.enableControllerService(reader);
+ runner.setProperty(ConsumeGCPubSub.RECORD_READER, "json-reader");
+
+ messages.add(createMessage("{\"foo\":\"foo1\"}"));
+ messages.add(createMessage("test2"));
+ messages.add(createMessage("{\"foo\":\"foo2\"}"));
+ messages.add(createMessage("test3"));
+ runner.run(1);
+
+ runner.assertTransferCount(ConsumeGCPubSub.REL_SUCCESS, 1);
+ runner.assertTransferCount(ConsumeGCPubSub.REL_PARSE_FAILURE, 2);
+
+ final MockFlowFile flowFileSuccess =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+
flowFileSuccess.assertContentEquals("[{\"foo\":\"foo1\"},{\"foo\":\"foo2\"}]");
+
flowFileSuccess.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFileSuccess.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+
+ final MockFlowFile flowFileParseFailure =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_PARSE_FAILURE).iterator().next();
+ flowFileParseFailure.assertContentEquals("test2");
+
flowFileParseFailure.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFileParseFailure.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+ flowFileParseFailure.assertAttributeEquals("attKey", "attValue");
+ }
+
+ @Test
+ void testRecordStrategyWithWrapper() throws InitializationException,
JsonMappingException, JsonProcessingException {
+ runner.setProperty(ConsumeGCPubSub.PROCESSING_STRATEGY,
ProcessingStrategy.RECORD);
+ runner.setProperty(ConsumeGCPubSub.OUTPUT_STRATEGY,
OutputStrategy.USE_WRAPPER);
+
+ final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+ runner.addControllerService("json-writer", writer);
+ runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON,
"true");
+ runner.enableControllerService(writer);
+ runner.setProperty(ConsumeGCPubSub.RECORD_WRITER, "json-writer");
+
+ final JsonTreeReader reader = new JsonTreeReader();
+ runner.addControllerService("json-reader", reader);
+ runner.enableControllerService(reader);
+ runner.setProperty(ConsumeGCPubSub.RECORD_READER, "json-reader");
+
+ messages.add(createMessage("{\"foo\":\"foo1\"}"));
+ messages.add(createMessage("test2"));
+ messages.add(createMessage("{\"foo\":\"foo2\"}"));
+ messages.add(createMessage("test3"));
+ runner.run(1);
+
+ runner.assertTransferCount(ConsumeGCPubSub.REL_SUCCESS, 1);
+ runner.assertTransferCount(ConsumeGCPubSub.REL_PARSE_FAILURE, 2);
+
+ final String expected = """
+ [ {
+ "metadata" : {
+ "gcp.pubsub.ackId" : "ackId",
+ "gcp.pubsub.messageSize" : 56,
+ "gcp.pubsub.messageId" : "messageId",
+ "gcp.pubsub.attributesCount" : 1,
+ "gcp.pubsub.publishTime" : 0
+ },
+ "attributes" : {
+ "attKey" : "attValue"
+ },
+ "value" : {
+ "foo" : "foo1"
+ }
+ }, {
+ "metadata" : {
+ "gcp.pubsub.ackId" : "ackId",
+ "gcp.pubsub.messageSize" : 56,
+ "gcp.pubsub.messageId" : "messageId",
+ "gcp.pubsub.attributesCount" : 1,
+ "gcp.pubsub.publishTime" : 0
+ },
+ "attributes" : {
+ "attKey" : "attValue"
+ },
+ "value" : {
+ "foo" : "foo2"
+ }
+ } ]""";
+
+ final MockFlowFile flowFileSuccess =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_SUCCESS).iterator().next();
+ final String content = flowFileSuccess.getContent();
+ assertEquals(mapper.readTree(content), mapper.readTree(expected));
+
flowFileSuccess.assertAttributeNotExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFileSuccess.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+
+ final MockFlowFile flowFileParseFailure =
runner.getFlowFilesForRelationship(ConsumeGCPubSub.REL_PARSE_FAILURE).iterator().next();
+ flowFileParseFailure.assertContentEquals("test2");
+
flowFileParseFailure.assertAttributeExists(PubSubAttributes.MESSAGE_ID_ATTRIBUTE);
+
flowFileParseFailure.assertAttributeEquals(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE,
SUBSCRIPTION_FULL);
+ flowFileParseFailure.assertAttributeEquals("attKey", "attValue");
+ }
+
+ private ReceivedMessage createMessage(String content) {
+ final byte[] data = content.getBytes();
+ return ReceivedMessage.newBuilder()
+ .setMessage(PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(data))
+ .putAttributes("attKey", "attValue")
+ .setMessageId("messageId")
+ .build())
+ .setAckId("ackId")
+ .build();
+ }
+
+ private static String getCredentialsServiceId(final TestRunner runner)
throws InitializationException {
+ final ControllerService controllerService =
mock(GCPCredentialsControllerService.class);
+ final String controllerServiceId =
GCPCredentialsControllerService.class.getSimpleName();
+
when(controllerService.getIdentifier()).thenReturn(controllerServiceId);
+ runner.addControllerService(controllerServiceId, controllerService);
+ runner.enableControllerService(controllerService);
+ return controllerServiceId;
+ }
+}