This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 77e51cb7fa NIFI-15210 Fixed verify method of GCS processors and other 
GCP fixes
77e51cb7fa is described below

commit 77e51cb7fa615364836c55f56e8828214934c83b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 12 17:22:29 2025 +0100

    NIFI-15210 Fixed verify method of GCS processors and other GCP fixes
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10519.
---
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     |  900 ++++++++---------
 .../processors/gcp/pubsub/PublishGCPubSub.java     | 1008 ++++++++++----------
 .../gcp/storage/AbstractGCSProcessor.java          |   12 +-
 .../processors/gcp/storage/DeleteGCSObject.java    |    6 +-
 .../processors/gcp/storage/FetchGCSObject.java     |    5 +-
 .../nifi/processors/gcp/storage/ListGCSBucket.java |   13 -
 .../nifi/processors/gcp/storage/PutGCSObject.java  |    5 +-
 .../gcp/drive/AbstractGoogleDriveIT.java           |    2 +
 .../processors/gcp/pubsub/AbstractGCPubSubIT.java  |   95 +-
 .../nifi/processors/gcp/storage/AbstractGCSIT.java |    3 +
 .../processors/gcp/storage/FetchGCSObjectIT.java   |    4 +-
 11 files changed, 1023 insertions(+), 1030 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 5e118fe617..b85e99884b 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -1,450 +1,450 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.api.pathtemplate.ValidationException;
-import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
-import com.google.cloud.pubsub.v1.stub.SubscriberStub;
-import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
-import com.google.iam.v1.TestIamPermissionsRequest;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.ProjectSubscriptionName;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.migration.PropertyConfiguration;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
-import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
-import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
-import 
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
-import 
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
-
-@SeeAlso({ PublishGCPubSub.class })
-@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
-@CapabilityDescription(
-    """
-     Consumes messages from the configured Google Cloud PubSub subscription. 
The 'Batch Size' property specified the maximum
-     number of messages that will be pulled from the subscription in a single 
request. The 'Processing Strategy' property
-     specifies if each message should be its own FlowFile or if messages 
should be grouped into a single FlowFile. Using the
-     Demarcator strategy will provide best throughput when the format allows 
it. Using Record allows to convert data format
-     as well as doing schema enforcement. Using the FlowFile strategy will 
generate one FlowFile per message and will have
-     the message's attributes as FlowFile attributes.
-    """
-)
-@WritesAttributes(
-    {
-            @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
-            @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, 
description = SERIALIZED_SIZE_DESCRIPTION),
-            @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
-            @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, 
description = MSG_PUBLISH_TIME_DESCRIPTION),
-            @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE, 
description = SUBSCRIPTION_NAME_DESCRIPTION),
-            @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, 
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
-    }
-)
-public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
-
-    private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.subscriptions.consume");
-
-    public static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
-            .name("Subscription")
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            .description("Name of the Google Cloud Pub/Sub Subscription")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .build();
-
-    public static final PropertyDescriptor PROCESSING_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("Processing Strategy")
-            .description("Strategy for processing PubSub Records and writing 
serialized output to FlowFiles")
-            .required(true)
-            .allowableValues(ProcessingStrategy.class)
-            .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
-            .expressionLanguageSupported(NONE)
-            .build();
-
-    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
-            .name("Record Reader")
-            .description("The Record Reader to use for incoming messages")
-            .identifiesControllerService(RecordReaderFactory.class)
-            .required(true)
-            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
-            .build();
-
-    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
-            .name("Record Writer")
-            .description("The Record Writer to use in order to serialize the 
outgoing FlowFiles")
-            .identifiesControllerService(RecordSetWriterFactory.class)
-            .required(true)
-            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
-            .build();
-
-    public static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("Output Strategy")
-            .description("The format used to output the Kafka Record into a 
FlowFile Record.")
-            .required(true)
-            .defaultValue(OutputStrategy.USE_VALUE)
-            .allowableValues(OutputStrategy.class)
-            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
-            .build();
-
-    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
-            .name("Message Demarcator")
-            .required(true)
-            .addValidator(Validator.VALID)
-            .description(
-                """
-                Since the PubSub client receives messages in batches, this 
Processor has an option to output FlowFiles
-                which contains all the messages in a single batch. This 
property allows you to provide a string
-                (interpreted as UTF-8) to use for demarcating apart multiple 
messages. To enter special character
-                such as 'new line' use CTRL+Enter or Shift+Enter depending on 
the OS.
-                """)
-            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
-            .build();
-
-    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
-            .name("parse failure")
-            .description("If configured to use a Record Reader, a PubSub 
message that cannot be parsed using the configured Record Reader will be routed 
to this relationship")
-            .build();
-
-    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
-            GCP_CREDENTIALS_PROVIDER_SERVICE,
-            PROJECT_ID,
-            SUBSCRIPTION,
-            BATCH_SIZE_THRESHOLD,
-            PROCESSING_STRATEGY,
-            RECORD_READER,
-            RECORD_WRITER,
-            OUTPUT_STRATEGY,
-            MESSAGE_DEMARCATOR,
-            API_ENDPOINT,
-            PROXY_CONFIGURATION_SERVICE);
-
-    private static final Set<Relationship> SUCCESS_RELATIONSHIP = 
Set.of(REL_SUCCESS);
-    private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
-
-    protected SubscriberStub subscriber = null;
-    private PullRequest pullRequest;
-    protected volatile OutputStrategy outputStrategy;
-    protected volatile ProcessingStrategy processingStrategy;
-    private volatile boolean useReader;
-    protected volatile String demarcatorValue;
-
-    private final AtomicReference<Exception> storedException = new 
AtomicReference<>();
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTY_DESCRIPTORS;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return useReader ? SUCCESS_FAILURE_RELATIONSHIPS : 
SUCCESS_RELATIONSHIP;
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-        if (descriptor.equals(RECORD_READER)) {
-            useReader = newValue != null;
-        }
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        final Integer batchSize = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
-
-        pullRequest = PullRequest.newBuilder()
-                .setMaxMessages(batchSize)
-                .setSubscription(getSubscriptionName(context, null))
-                .build();
-
-        try {
-            subscriber = getSubscriber(context);
-        } catch (IOException e) {
-            storedException.set(e);
-            getLogger().error("Failed to create Google Cloud Subscriber", e);
-        }
-
-        processingStrategy = 
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
-        outputStrategy = processingStrategy == ProcessingStrategy.RECORD ? 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) : 
null;
-        demarcatorValue = processingStrategy == ProcessingStrategy.DEMARCATOR 
? context.getProperty(MESSAGE_DEMARCATOR).getValue() : null;
-    }
-
-    @Override
-    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
-        final List<ConfigVerificationResult> results = new ArrayList<>();
-        String subscriptionName = null;
-        try {
-            subscriptionName = getSubscriptionName(context, attributes);
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Parse Subscription Name")
-                    .outcome(Outcome.SUCCESSFUL)
-                    .explanation("Successfully parsed Subscription Name")
-                    .build());
-        } catch (final ValidationException e) {
-            verificationLogger.error("Failed to parse Subscription Name", e);
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Parse Subscription Name")
-                    .outcome(Outcome.FAILED)
-                    .explanation(String.format("Failed to parse Subscription 
Name: " + e.getMessage()))
-                    .build());
-        }
-        SubscriberStub subscriber = null;
-        try {
-            subscriber = getSubscriber(context);
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create Subscriber")
-                    .outcome(Outcome.SUCCESSFUL)
-                    .explanation("Successfully created Subscriber")
-                    .build());
-        } catch (final IOException e) {
-            verificationLogger.error("Failed to create Subscriber", e);
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create Subscriber")
-                    .outcome(Outcome.FAILED)
-                    .explanation(String.format("Failed to create Subscriber: " 
+ e.getMessage()))
-                    .build());
-        }
-
-        if (subscriber != null && subscriptionName != null) {
-            try {
-                final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
-                if 
(subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
-                    results.add(new ConfigVerificationResult.Builder()
-                            .verificationStepName("Test IAM Permissions")
-                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                            .explanation(String.format("Verified Subscription 
[%s] exists and the configured user has the correct permissions.", 
subscriptionName))
-                            .build());
-                } else {
-                    results.add(new ConfigVerificationResult.Builder()
-                            .verificationStepName("Test IAM Permissions")
-                            .outcome(ConfigVerificationResult.Outcome.FAILED)
-                            .explanation(String.format("The configured user 
does not have the correct permissions on Subscription [%s].", subscriptionName))
-                            .build());
-                }
-            } catch (final ApiException e) {
-                verificationLogger.error("The configured user appears to have 
the correct permissions, but the following error was encountered", e);
-                results.add(new ConfigVerificationResult.Builder()
-                        .verificationStepName("Test IAM Permissions")
-                        .outcome(ConfigVerificationResult.Outcome.FAILED)
-                        .explanation(String.format("The configured user 
appears to have the correct permissions, but the following error was 
encountered: " + e.getMessage()))
-                        .build());
-            }
-        }
-        return results;
-    }
-
-    @OnStopped
-    public void onStopped() {
-        if (subscriber != null) {
-            subscriber.shutdown();
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        if (subscriber == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Failed to create Google Cloud PubSub 
subscriber due to {}", storedException.get());
-            } else {
-                getLogger().error("Google Cloud PubSub Subscriber was not 
properly created. Yielding the processor...");
-            }
-
-            context.yield();
-            return;
-        }
-
-        final PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
-        final List<String> ackIds = new ArrayList<>();
-        final String subscriptionName = getSubscriptionName(context, null);
-        List<ReceivedMessage> receivedMessages = 
pullResponse.getReceivedMessagesList();
-
-        switch (processingStrategy) {
-            case RECORD -> processInputRecords(context, session, 
receivedMessages, subscriptionName, ackIds);
-            case FLOW_FILE -> processInputFlowFile(session, receivedMessages, 
subscriptionName, ackIds);
-            case DEMARCATOR -> processInputDemarcator(session, 
receivedMessages, subscriptionName, ackIds);
-        }
-
-        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
-    }
-
-    @Override
-    public void migrateProperties(PropertyConfiguration config) {
-        super.migrateProperties(config);
-        config.renameProperty("gcp-pubsub-subscription", 
SUBSCRIPTION.getName());
-    }
-
-    private void processInputDemarcator(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName,
-                                        final List<String> ackIds) {
-        final byte[] demarcator = demarcatorValue == null ? new byte[0] : 
demarcatorValue.getBytes(StandardCharsets.UTF_8);
-        FlowFile flowFile = session.create();
-
-        try {
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(OutputStream out) throws IOException {
-                    for (ReceivedMessage message : receivedMessages) {
-                        if (message.hasMessage()) {
-                            
out.write(message.getMessage().getData().toByteArray());
-                            out.write(demarcator);
-                            ackIds.add(message.getAckId());
-                        }
-                    }
-                }
-            });
-            session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE, 
subscriptionName);
-        } catch (Exception e) {
-            ackIds.clear();
-            session.remove(flowFile);
-            throw new ProcessException("Failed to write batch of messages in 
FlowFile", e);
-        }
-
-        if (flowFile.getSize() > 0) {
-            session.putAttribute(flowFile, "record.count", 
String.valueOf(ackIds.size()));
-            session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
-            session.adjustCounter("Records Received from " + subscriptionName, 
ackIds.size(), false);
-        } else {
-            session.remove(flowFile);
-        }
-    }
-
-    private void processInputFlowFile(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName, final 
List<String> ackIds) {
-        for (ReceivedMessage message : receivedMessages) {
-            if (message.hasMessage()) {
-                FlowFile flowFile = session.create();
-
-                final Map<String, String> attributes = new HashMap<>();
-                attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
-                attributes.put(SERIALIZED_SIZE_ATTRIBUTE, 
String.valueOf(message.getSerializedSize()));
-                attributes.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
-                attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
-                attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
-                attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
-                attributes.putAll(message.getMessage().getAttributesMap());
-
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toByteArray()));
-
-                ackIds.add(message.getAckId());
-                session.transfer(flowFile, REL_SUCCESS);
-                session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
-            }
-        }
-    }
-
-    private void processInputRecords(final ProcessContext context, final 
ProcessSession session, final List<ReceivedMessage> receivedMessages, final 
String subscriptionName,
-            final List<String> ackIds) {
-        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        PubSubMessageConverter converter = switch (outputStrategy) {
-            case USE_VALUE -> new 
RecordStreamPubSubMessageConverter(readerFactory, writerFactory, getLogger());
-            case USE_WRAPPER -> new 
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory, 
getLogger());
-        };
-
-        converter.toFlowFiles(session, receivedMessages, ackIds, 
subscriptionName);
-    }
-
-    private void acknowledgeAcks(final Collection<String> ackIds, final String 
subscriptionName) {
-        if (ackIds == null || ackIds.isEmpty()) {
-            return;
-        }
-
-        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
-                .addAllAckIds(ackIds)
-                .setSubscription(subscriptionName)
-                .build();
-        subscriber.acknowledgeCallable().call(acknowledgeRequest);
-    }
-
-    private String getSubscriptionName(final ProcessContext context, final 
Map<String, String> additionalAttributes) {
-        final String subscriptionName = 
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
-        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
-
-        if (subscriptionName.contains("/")) {
-            return ProjectSubscriptionName.parse(subscriptionName).toString();
-        } else {
-            return ProjectSubscriptionName.of(projectId, 
subscriptionName).toString();
-        }
-
-    }
-
-    protected SubscriberStub getSubscriber(final ProcessContext context) 
throws IOException {
-        final String endpoint = context.getProperty(API_ENDPOINT).getValue();
-
-        final SubscriberStubSettings.Builder subscriberBuilder = 
SubscriberStubSettings.newBuilder()
-                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-                
.setTransportChannelProvider(getTransportChannelProvider(context))
-                .setEndpoint(endpoint);
-
-        return GrpcSubscriberStub.create(subscriberBuilder.build());
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.pathtemplate.ValidationException;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
+import 
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
+import 
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
+
+@SeeAlso({ PublishGCPubSub.class })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
+@CapabilityDescription(
+    """
+     Consumes messages from the configured Google Cloud PubSub subscription. 
The 'Batch Size' property specified the maximum
+     number of messages that will be pulled from the subscription in a single 
request. The 'Processing Strategy' property
+     specifies if each message should be its own FlowFile or if messages 
should be grouped into a single FlowFile. Using the
+     Demarcator strategy will provide best throughput when the format allows 
it. Using Record allows to convert data format
+     as well as doing schema enforcement. Using the FlowFile strategy will 
generate one FlowFile per message and will have
+     the message's attributes as FlowFile attributes.
+    """
+)
+@WritesAttributes(
+    {
+            @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
+            @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, 
description = SERIALIZED_SIZE_DESCRIPTION),
+            @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+            @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, 
description = MSG_PUBLISH_TIME_DESCRIPTION),
+            @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE, 
description = SUBSCRIPTION_NAME_DESCRIPTION),
+            @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, 
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+    }
+)
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+
+    private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.subscriptions.consume");
+
+    public static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+            .name("Subscription")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .description("Name of the Google Cloud Pub/Sub Subscription")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    public static final PropertyDescriptor PROCESSING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Processing Strategy")
+            .description("Strategy for processing PubSub Records and writing 
serialized output to FlowFiles")
+            .required(true)
+            .allowableValues(ProcessingStrategy.class)
+            .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming messages")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
outgoing FlowFiles")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Output Strategy")
+            .description("The format used to output the Kafka Record into a 
FlowFile Record.")
+            .required(true)
+            .defaultValue(OutputStrategy.USE_VALUE)
+            .allowableValues(OutputStrategy.class)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(true)
+            .addValidator(Validator.VALID)
+            .description(
+                """
+                Since the PubSub client receives messages in batches, this 
Processor has an option to output FlowFiles
+                which contains all the messages in a single batch. This 
property allows you to provide a string
+                (interpreted as UTF-8) to use for demarcating apart multiple 
messages. To enter special character
+                such as 'new line' use CTRL+Enter or Shift+Enter depending on 
the OS.
+                """)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+            .build();
+
+    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
+            .name("parse failure")
+            .description("If configured to use a Record Reader, a PubSub 
message that cannot be parsed using the configured Record Reader will be routed 
to this relationship")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            GCP_CREDENTIALS_PROVIDER_SERVICE,
+            PROJECT_ID,
+            SUBSCRIPTION,
+            BATCH_SIZE_THRESHOLD,
+            PROCESSING_STRATEGY,
+            RECORD_READER,
+            RECORD_WRITER,
+            OUTPUT_STRATEGY,
+            MESSAGE_DEMARCATOR,
+            API_ENDPOINT,
+            PROXY_CONFIGURATION_SERVICE);
+
+    private static final Set<Relationship> SUCCESS_RELATIONSHIP = 
Set.of(REL_SUCCESS);
+    private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
+
+    protected SubscriberStub subscriber = null;
+    private PullRequest pullRequest;
+    protected volatile OutputStrategy outputStrategy;
+    protected volatile ProcessingStrategy processingStrategy;
+    private volatile boolean useReader;
+    protected volatile String demarcatorValue;
+
+    private final AtomicReference<Exception> storedException = new 
AtomicReference<>();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return useReader ? SUCCESS_FAILURE_RELATIONSHIPS : 
SUCCESS_RELATIONSHIP;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (descriptor.equals(RECORD_READER)) {
+            useReader = newValue != null;
+        }
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        final Integer batchSize = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
+
+        pullRequest = PullRequest.newBuilder()
+                .setMaxMessages(batchSize)
+                .setSubscription(getSubscriptionName(context, null))
+                .build();
+
+        try {
+            subscriber = getSubscriber(context);
+        } catch (IOException e) {
+            storedException.set(e);
+            getLogger().error("Failed to create Google Cloud Subscriber", e);
+        }
+
+        processingStrategy = 
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+        outputStrategy = processingStrategy == ProcessingStrategy.RECORD ? 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) : 
null;
+        demarcatorValue = processingStrategy == ProcessingStrategy.DEMARCATOR 
? context.getProperty(MESSAGE_DEMARCATOR).getValue() : null;
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+        String subscriptionName = null;
+        try {
+            subscriptionName = getSubscriptionName(context, attributes);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Subscription Name")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully parsed Subscription Name")
+                    .build());
+        } catch (final ValidationException e) {
+            verificationLogger.error("Failed to parse Subscription Name", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Subscription Name")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to parse Subscription 
Name: " + e.getMessage()))
+                    .build());
+        }
+        SubscriberStub subscriber = null;
+        try {
+            subscriber = getSubscriber(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Subscriber")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created Subscriber")
+                    .build());
+        } catch (final IOException e) {
+            verificationLogger.error("Failed to create Subscriber", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Subscriber")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to create Subscriber: " 
+ e.getMessage()))
+                    .build());
+        }
+
+        if (subscriber != null && subscriptionName != null) {
+            try {
+                final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
+                if 
(subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Verified Subscription 
[%s] exists and the configured user has the correct permissions.", 
subscriptionName))
+                            .build());
+                } else {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("The configured user 
does not have the correct permissions on Subscription [%s].", subscriptionName))
+                            .build());
+                }
+            } catch (final ApiException e) {
+                verificationLogger.error("The configured user appears to have 
the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user 
appears to have the correct permissions, but the following error was 
encountered: " + e.getMessage()))
+                        .build());
+            }
+        }
+        return results;
+    }
+
+    @OnStopped
+    public void onStopped() {
+        if (subscriber != null) {
+            subscriber.shutdown();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if (subscriber == null) {
+            if (storedException.get() != null) {
+                getLogger().error("Failed to create Google Cloud PubSub 
subscriber due to {}", storedException.get());
+            } else {
+                getLogger().error("Google Cloud PubSub Subscriber was not 
properly created. Yielding the processor...");
+            }
+
+            context.yield();
+            return;
+        }
+
+        final PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
+        final List<String> ackIds = new ArrayList<>();
+        final String subscriptionName = getSubscriptionName(context, null);
+        List<ReceivedMessage> receivedMessages = 
pullResponse.getReceivedMessagesList();
+
+        switch (processingStrategy) {
+            case RECORD -> processInputRecords(context, session, 
receivedMessages, subscriptionName, ackIds);
+            case FLOW_FILE -> processInputFlowFile(session, receivedMessages, 
subscriptionName, ackIds);
+            case DEMARCATOR -> processInputDemarcator(session, 
receivedMessages, subscriptionName, ackIds);
+        }
+
+        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+    }
+
+    @Override
+    public void migrateProperties(PropertyConfiguration config) {
+        super.migrateProperties(config);
+        config.renameProperty("gcp-pubsub-subscription", 
SUBSCRIPTION.getName());
+    }
+
+    private void processInputDemarcator(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName,
+                                        final List<String> ackIds) {
+        final byte[] demarcator = demarcatorValue == null ? new byte[0] : 
demarcatorValue.getBytes(StandardCharsets.UTF_8);
+        FlowFile flowFile = session.create();
+
+        try {
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    for (ReceivedMessage message : receivedMessages) {
+                        if (message.hasMessage()) {
+                            
out.write(message.getMessage().getData().toByteArray());
+                            out.write(demarcator);
+                            ackIds.add(message.getAckId());
+                        }
+                    }
+                }
+            });
+            session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE, 
subscriptionName);
+        } catch (Exception e) {
+            ackIds.clear();
+            session.remove(flowFile);
+            throw new ProcessException("Failed to write batch of messages in 
FlowFile", e);
+        }
+
+        if (flowFile.getSize() > 0) {
+            session.putAttribute(flowFile, "record.count", 
String.valueOf(ackIds.size()));
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
+            session.adjustCounter("Records Received from " + subscriptionName, 
ackIds.size(), false);
+        } else {
+            session.remove(flowFile);
+        }
+    }
+
+    private void processInputFlowFile(final ProcessSession session, final 
List<ReceivedMessage> receivedMessages, final String subscriptionName, final 
List<String> ackIds) {
+        for (ReceivedMessage message : receivedMessages) {
+            if (message.hasMessage()) {
+                FlowFile flowFile = session.create();
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
+                attributes.put(SERIALIZED_SIZE_ATTRIBUTE, 
String.valueOf(message.getSerializedSize()));
+                attributes.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
+                attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
+                attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+                attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
+                attributes.putAll(message.getMessage().getAttributesMap());
+
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toByteArray()));
+
+                ackIds.add(message.getAckId());
+                session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, 
subscriptionName);
+            }
+        }
+    }
+
+    private void processInputRecords(final ProcessContext context, final 
ProcessSession session, final List<ReceivedMessage> receivedMessages, final 
String subscriptionName,
+            final List<String> ackIds) {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        PubSubMessageConverter converter = switch (outputStrategy) {
+            case USE_VALUE -> new 
RecordStreamPubSubMessageConverter(readerFactory, writerFactory, getLogger());
+            case USE_WRAPPER -> new 
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory, 
getLogger());
+        };
+
+        converter.toFlowFiles(session, receivedMessages, ackIds, 
subscriptionName);
+    }
+
+    private void acknowledgeAcks(final Collection<String> ackIds, final String 
subscriptionName) {
+        if (ackIds == null || ackIds.isEmpty()) {
+            return;
+        }
+
+        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+                .addAllAckIds(ackIds)
+                .setSubscription(subscriptionName)
+                .build();
+        subscriber.acknowledgeCallable().call(acknowledgeRequest);
+    }
+
+    private String getSubscriptionName(final ProcessContext context, final 
Map<String, String> additionalAttributes) {
+        final String subscriptionName = 
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
+        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
+
+        if (subscriptionName.contains("/")) {
+            return ProjectSubscriptionName.parse(subscriptionName).toString();
+        } else {
+            return ProjectSubscriptionName.of(projectId, 
subscriptionName).toString();
+        }
+
+    }
+
+    protected SubscriberStub getSubscriber(final ProcessContext context) 
throws IOException {
+        final String endpoint = 
context.getProperty(API_ENDPOINT).evaluateAttributeExpressions().getValue();
+
+        final SubscriberStubSettings.Builder subscriberBuilder = 
SubscriberStubSettings.newBuilder()
+                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                
.setTransportChannelProvider(getTransportChannelProvider(context))
+                .setEndpoint(endpoint);
+
+        return GrpcSubscriberStub.create(subscriberBuilder.build());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 83bb3c6ee9..66d51038a9 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -1,504 +1,504 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutureCallback;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.core.FixedExecutorProvider;
-import com.google.cloud.pubsub.v1.Publisher;
-import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
-import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.iam.v1.TestIamPermissionsRequest;
-import com.google.iam.v1.TestIamPermissionsResponse;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.ProjectTopicName;
-import com.google.pubsub.v1.PubsubMessage;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.migration.PropertyConfiguration;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
-import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
-import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.PushBackRecordSet;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.StopWatch;
-import org.threeten.bp.Duration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
-
-@SeeAlso({ConsumeGCPubSub.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
-@CapabilityDescription("Publishes the content of the incoming flowfile to the 
configured Google Cloud PubSub topic. The processor supports dynamic 
properties." +
-        " If any dynamic properties are present, they will be sent along with 
the message in the form of 'attributes'.")
-@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
-        description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-@WritesAttributes({
-        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
-        @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = 
RECORDS_DESCRIPTION),
-        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
-})
-@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
-        + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubProcessor {
-    private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
-    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
-
-    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Input Batch Size")
-            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.NUMBER_VALIDATOR)
-            .defaultValue("100")
-            .build();
-
-    public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("Message Derivation Strategy")
-            .description("The strategy used to publish the incoming FlowFile 
to the Google Cloud PubSub endpoint.")
-            .required(true)
-            
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
-            .allowableValues(MessageDerivationStrategy.class)
-            .build();
-
-    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
-            .name("Record Reader")
-            .description("The Record Reader to use for incoming FlowFiles")
-            .identifiesControllerService(RecordReaderFactory.class)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
-            .name("Record Writer")
-            .description("The Record Writer to use in order to serialize the 
data before sending to GCPubSub endpoint")
-            .identifiesControllerService(RecordSetWriterFactory.class)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Maximum Message Size")
-            .description("The maximum size of a Google PubSub message in 
bytes. Defaults to 1 MB (1048576 bytes)")
-            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .build();
-
-    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
-            .name("Topic Name")
-            .description("Name of the Google Cloud PubSub Topic")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .build();
-
-    public static final Relationship REL_RETRY = new Relationship.Builder()
-            .name("retry")
-            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
-            .build();
-
-    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
-            GCP_CREDENTIALS_PROVIDER_SERVICE,
-            PROJECT_ID,
-            TOPIC_NAME,
-            MESSAGE_DERIVATION_STRATEGY,
-            RECORD_READER,
-            RECORD_WRITER,
-            MAX_BATCH_SIZE,
-            MAX_MESSAGE_SIZE,
-            BATCH_SIZE_THRESHOLD,
-            BATCH_BYTES_THRESHOLD,
-            BATCH_DELAY_THRESHOLD,
-            API_ENDPOINT,
-            PROXY_CONFIGURATION_SERVICE
-    );
-
-    public static final Set<Relationship> RELATIONSHIPS = Set.of(
-            REL_SUCCESS,
-            REL_FAILURE,
-            REL_RETRY
-    );
-
-    protected Publisher publisher = null;
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTY_DESCRIPTORS;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .required(false)
-                .name(propertyDescriptorName)
-                .displayName(propertyDescriptorName)
-                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
-                .dynamic(true)
-                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .build();
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        try {
-            publisher = getPublisherBuilder(context).build();
-        } catch (IOException e) {
-            throw new ProcessException("Failed to create Google Cloud PubSub 
Publisher", e);
-        }
-    }
-
-    @Override
-    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
-        final List<ConfigVerificationResult> results = new ArrayList<>();
-        Publisher publisher = null;
-        try {
-            publisher = getPublisherBuilder(context).build();
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create Publisher")
-                    .outcome(Outcome.SUCCESSFUL)
-                    .explanation("Successfully created Publisher")
-                    .build());
-        } catch (final IOException e) {
-            verificationLogger.error("Failed to create Publisher", e);
-            results.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create Publisher")
-                    .outcome(Outcome.FAILED)
-                    .explanation(String.format("Failed to create Publisher: " 
+ e.getMessage()))
-                    .build());
-        }
-
-        if (publisher != null) {
-            try {
-                final PublisherStubSettings publisherStubSettings = 
PublisherStubSettings.newBuilder()
-                        
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-                        
.setTransportChannelProvider(getTransportChannelProvider(context))
-                        .build();
-
-                try (GrpcPublisherStub publisherStub = 
GrpcPublisherStub.create(publisherStubSettings)) {
-                    final String topicName = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-                    final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder()
-                            .addAllPermissions(REQUIRED_PERMISSIONS)
-                            .setResource(topicName)
-                            .build();
-                    final TestIamPermissionsResponse response = 
publisherStub.testIamPermissionsCallable().call(request);
-                    if (response.getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
-                        results.add(new ConfigVerificationResult.Builder()
-                                .verificationStepName("Test IAM Permissions")
-                                
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                                .explanation(String.format("Verified Topic 
[%s] exists and the configured user has the correct permissions.", topicName))
-                                .build());
-                    } else {
-                        results.add(new ConfigVerificationResult.Builder()
-                                .verificationStepName("Test IAM Permissions")
-                                
.outcome(ConfigVerificationResult.Outcome.FAILED)
-                                .explanation(String.format("The configured 
user does not have the correct permissions on Topic [%s].", topicName))
-                                .build());
-                    }
-                }
-            } catch (final ApiException e) {
-                verificationLogger.error("The configured user appears to have 
the correct permissions, but the following error was encountered", e);
-                results.add(new ConfigVerificationResult.Builder()
-                        .verificationStepName("Test IAM Permissions")
-                        .outcome(ConfigVerificationResult.Outcome.FAILED)
-                        .explanation(String.format("The configured user 
appears to have the correct permissions, but the following error was 
encountered: " + e.getMessage()))
-                        .build());
-            } catch (final IOException e) {
-                verificationLogger.error("The publisher stub could not be 
created in order to test the permissions", e);
-                results.add(new ConfigVerificationResult.Builder()
-                        .verificationStepName("Test IAM Permissions")
-                        .outcome(ConfigVerificationResult.Outcome.FAILED)
-                        .explanation(String.format("The publisher stub could 
not be created in order to test the permissions: " + e.getMessage()))
-                        .build());
-
-            }
-        }
-        return results;
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final StopWatch stopWatch = new StopWatch(true);
-        final MessageDerivationStrategy inputStrategy = 
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
-        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
-        if (flowFileBatch.isEmpty()) {
-            context.yield();
-        } else if 
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
-            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
-        } else if 
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
-            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
-        } else {
-            throw new IllegalStateException(inputStrategy.getValue());
-        }
-    }
-
-    @Override
-    public void migrateProperties(PropertyConfiguration config) {
-        super.migrateProperties(config);
-        config.renameProperty("gcp-pubsub-topic", TOPIC_NAME.getName());
-    }
-
-    private void onTriggerFlowFileStrategy(
-            final ProcessContext context,
-            final ProcessSession session,
-            final StopWatch stopWatch,
-            final List<FlowFile> flowFileBatch) throws ProcessException {
-        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
-
-        final Executor executor = MoreExecutors.directExecutor();
-        final List<FlowFileResult> flowFileResults = new ArrayList<>();
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        for (final FlowFile flowFile : flowFileBatch) {
-            final List<ApiFuture<String>> futures = new ArrayList<>();
-            final List<String> successes = Collections.synchronizedList(new 
ArrayList<>());
-            final List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
-
-            if (flowFile.getSize() > maxMessageSize) {
-                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
-                failures.add(new IllegalArgumentException(message));
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
-            } else {
-                baos.reset();
-                session.exportTo(flowFile, baos);
-
-                final ApiFuture<String> apiFuture = publishOneMessage(context, 
flowFile, baos.toByteArray());
-                futures.add(apiFuture);
-                addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
-            }
-        }
-        finishBatch(session, stopWatch, flowFileResults);
-    }
-
-    private void onTriggerRecordStrategy(
-            final ProcessContext context,
-            final ProcessSession session,
-            final StopWatch stopWatch,
-            final List<FlowFile> flowFileBatch) throws ProcessException {
-        try {
-            onTriggerRecordStrategyPublishRecords(context, session, stopWatch, 
flowFileBatch);
-        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
-            throw new ProcessException("Record publishing failed", e);
-        }
-    }
-
-    private void onTriggerRecordStrategyPublishRecords(
-            final ProcessContext context,
-            final ProcessSession session,
-            final StopWatch stopWatch,
-            final List<FlowFile> flowFileBatch)
-            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
-        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        final Executor executor = MoreExecutors.directExecutor();
-        final List<FlowFileResult> flowFileResults = new ArrayList<>();
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        for (final FlowFile flowFile : flowFileBatch) {
-            final List<ApiFuture<String>> futures = new ArrayList<>();
-            final List<String> successes = Collections.synchronizedList(new 
ArrayList<>());
-            final List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
-
-            final Map<String, String> attributes = flowFile.getAttributes();
-            try (final RecordReader reader = readerFactory.createRecordReader(
-                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger())) {
-                final RecordSet recordSet = reader.createRecordSet();
-                final RecordSchema schema = 
writerFactory.getSchema(attributes, recordSet.getSchema());
-
-                final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
-                final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
-
-                while (pushBackRecordSet.isAnotherRecord()) {
-                    final ApiFuture<String> apiFuture = 
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
-                    futures.add(apiFuture);
-                    addCallback(apiFuture, new 
TrackedApiFutureCallback(successes, failures), executor);
-                }
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
-            }
-        }
-        finishBatch(session, stopWatch, flowFileResults);
-    }
-
-    private ApiFuture<String> publishOneRecord(
-            final ProcessContext context,
-            final FlowFile flowFile,
-            final ByteArrayOutputStream baos,
-            final RecordSetWriter writer,
-            final Record record) throws IOException {
-        baos.reset();
-        writer.write(record);
-        writer.flush();
-        return publishOneMessage(context, flowFile, baos.toByteArray());
-    }
-
-    private ApiFuture<String> publishOneMessage(final ProcessContext context,
-                                                final FlowFile flowFile,
-                                                final byte[] content) {
-        final PubsubMessage message = PubsubMessage.newBuilder()
-                .setData(ByteString.copyFrom(content))
-                .setPublishTime(Timestamp.newBuilder().build())
-                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
-                .build();
-        return publisher.publish(message);
-    }
-
-    private void finishBatch(final ProcessSession session,
-                             final StopWatch stopWatch,
-                             final List<FlowFileResult> flowFileResults) {
-        final String topicName = publisher.getTopicNameString();
-        for (final FlowFileResult flowFileResult : flowFileResults) {
-            final Relationship relationship = flowFileResult.reconcile();
-            final Map<String, String> attributes = 
flowFileResult.getAttributes();
-            attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
-            final FlowFile flowFile = 
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
-            final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, 
topicName);
-            session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, relationship);
-        }
-    }
-
-    protected void addCallback(final ApiFuture<String> apiFuture, final 
ApiFutureCallback<? super String> callback, Executor executor) {
-        ApiFutures.addCallback(apiFuture, callback, executor);
-    }
-
-    @OnStopped
-    public void onStopped() {
-        shutdownPublisher();
-    }
-
-    private void shutdownPublisher() {
-        try {
-            if (publisher != null) {
-                publisher.shutdown();
-            }
-        } catch (Exception e) {
-            getLogger().warn("Failed to gracefully shutdown the Google Cloud 
PubSub Publisher", e);
-        }
-    }
-
-    private ProjectTopicName getTopicName(ProcessContext context) {
-        final String topic = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
-
-        if (topic.contains("/")) {
-            return ProjectTopicName.parse(topic);
-        } else {
-            return ProjectTopicName.of(projectId, topic);
-        }
-    }
-
-    private Map<String, String> getDynamicAttributesMap(ProcessContext 
context, FlowFile flowFile) {
-        final Map<String, String> attributes = new HashMap<>();
-        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            if (entry.getKey().isDynamic()) {
-                final String value = 
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
-                attributes.put(entry.getKey().getName(), value);
-            }
-        }
-
-        return attributes;
-    }
-
-    private Publisher.Builder getPublisherBuilder(ProcessContext context) {
-        final Long batchSizeThreshold = 
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
-        final long batchBytesThreshold = 
context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
-        final Long batchDelayThreshold = 
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
-        final String endpoint = context.getProperty(API_ENDPOINT).getValue();
-
-        final Publisher.Builder publisherBuilder = 
Publisher.newBuilder(getTopicName(context))
-                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-                .setChannelProvider(getTransportChannelProvider(context))
-                .setEndpoint(endpoint);
-
-        publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
-                .setElementCountThreshold(batchSizeThreshold)
-                .setRequestByteThreshold(batchBytesThreshold)
-                .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
-                .setIsEnabled(true)
-                .build());
-
-        // Set fixed thread pool executor to number of concurrent tasks
-        publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new 
ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks())));
-
-        return publisherBuilder;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.iam.v1.TestIamPermissionsResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.threeten.bp.Duration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
+
+@SeeAlso({ConsumeGCPubSub.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
+@CapabilityDescription("Publishes the content of the incoming flowfile to the 
configured Google Cloud PubSub topic. The processor supports dynamic 
properties." +
+        " If any dynamic properties are present, they will be sent along with 
the message in the form of 'attributes'.")
+@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
+        description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@WritesAttributes({
+        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = 
RECORDS_DESCRIPTION),
+        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
+})
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
+        + "will be read into memory to be sent as a PubSub message.")
+public class PublishGCPubSub extends AbstractGCPubSubProcessor {
+    private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Input Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Message Derivation Strategy")
+            .description("The strategy used to publish the incoming FlowFile 
to the Google Cloud PubSub endpoint.")
+            .required(true)
+            
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+            .allowableValues(MessageDerivationStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to GCPubSub endpoint")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Message Size")
+            .description("The maximum size of a Google PubSub message in 
bytes. Defaults to 1 MB (1048576 bytes)")
+            .dependsOn(MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("Name of the Google Cloud PubSub Topic")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            GCP_CREDENTIALS_PROVIDER_SERVICE,
+            PROJECT_ID,
+            TOPIC_NAME,
+            MESSAGE_DERIVATION_STRATEGY,
+            RECORD_READER,
+            RECORD_WRITER,
+            MAX_BATCH_SIZE,
+            MAX_MESSAGE_SIZE,
+            BATCH_SIZE_THRESHOLD,
+            BATCH_BYTES_THRESHOLD,
+            BATCH_DELAY_THRESHOLD,
+            API_ENDPOINT,
+            PROXY_CONFIGURATION_SERVICE
+    );
+
+    public static final Set<Relationship> RELATIONSHIPS = Set.of(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_RETRY
+    );
+
+    protected Publisher publisher = null;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .required(false)
+                .name(propertyDescriptorName)
+                .displayName(propertyDescriptorName)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .dynamic(true)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        try {
+            publisher = getPublisherBuilder(context).build();
+        } catch (IOException e) {
+            throw new ProcessException("Failed to create Google Cloud PubSub 
Publisher", e);
+        }
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+        Publisher publisher = null;
+        try {
+            publisher = getPublisherBuilder(context).build();
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Publisher")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created Publisher")
+                    .build());
+        } catch (final IOException e) {
+            verificationLogger.error("Failed to create Publisher", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Publisher")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to create Publisher: " 
+ e.getMessage()))
+                    .build());
+        }
+
+        if (publisher != null) {
+            try {
+                final PublisherStubSettings publisherStubSettings = 
PublisherStubSettings.newBuilder()
+                        
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                        
.setTransportChannelProvider(getTransportChannelProvider(context))
+                        .build();
+
+                try (GrpcPublisherStub publisherStub = 
GrpcPublisherStub.create(publisherStubSettings)) {
+                    final String topicName = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+                    final TestIamPermissionsRequest request = 
TestIamPermissionsRequest.newBuilder()
+                            .addAllPermissions(REQUIRED_PERMISSIONS)
+                            .setResource(topicName)
+                            .build();
+                    final TestIamPermissionsResponse response = 
publisherStub.testIamPermissionsCallable().call(request);
+                    if (response.getPermissionsCount() >= 
REQUIRED_PERMISSIONS.size()) {
+                        results.add(new ConfigVerificationResult.Builder()
+                                .verificationStepName("Test IAM Permissions")
+                                
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                                .explanation(String.format("Verified Topic 
[%s] exists and the configured user has the correct permissions.", topicName))
+                                .build());
+                    } else {
+                        results.add(new ConfigVerificationResult.Builder()
+                                .verificationStepName("Test IAM Permissions")
+                                
.outcome(ConfigVerificationResult.Outcome.FAILED)
+                                .explanation(String.format("The configured 
user does not have the correct permissions on Topic [%s].", topicName))
+                                .build());
+                    }
+                }
+            } catch (final ApiException e) {
+                verificationLogger.error("The configured user appears to have 
the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user 
appears to have the correct permissions, but the following error was 
encountered: " + e.getMessage()))
+                        .build());
+            } catch (final IOException e) {
+                verificationLogger.error("The publisher stub could not be 
created in order to test the permissions", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The publisher stub could 
not be created in order to test the permissions: " + e.getMessage()))
+                        .build());
+
+            }
+        }
+        return results;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final StopWatch stopWatch = new StopWatch(true);
+        final MessageDerivationStrategy inputStrategy = 
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
+            context.yield();
+        } else if 
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if 
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
+        }
+    }
+
+    @Override
+    public void migrateProperties(PropertyConfiguration config) {
+        super.migrateProperties(config);
+        config.renameProperty("gcp-pubsub-topic", TOPIC_NAME.getName());
+    }
+
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final Executor executor = MoreExecutors.directExecutor();
+        final List<FlowFileResult> flowFileResults = new ArrayList<>();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        for (final FlowFile flowFile : flowFileBatch) {
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            final List<String> successes = Collections.synchronizedList(new 
ArrayList<>());
+            final List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
+
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                failures.add(new IllegalArgumentException(message));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
+
+                final ApiFuture<String> apiFuture = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                futures.add(apiFuture);
+                addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
+            }
+        }
+        finishBatch(session, stopWatch, flowFileResults);
+    }
+
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        try {
+            onTriggerRecordStrategyPublishRecords(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException("Record publishing failed", e);
+        }
+    }
+
+    private void onTriggerRecordStrategyPublishRecords(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch)
+            throws ProcessException, IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        final Executor executor = MoreExecutors.directExecutor();
+        final List<FlowFileResult> flowFileResults = new ArrayList<>();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        for (final FlowFile flowFile : flowFileBatch) {
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            final List<String> successes = Collections.synchronizedList(new 
ArrayList<>());
+            final List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
+
+            final Map<String, String> attributes = flowFile.getAttributes();
+            try (final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger())) {
+                final RecordSet recordSet = reader.createRecordSet();
+                final RecordSchema schema = 
writerFactory.getSchema(attributes, recordSet.getSchema());
+
+                final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+                final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+
+                while (pushBackRecordSet.isAnotherRecord()) {
+                    final ApiFuture<String> apiFuture = 
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
+                    futures.add(apiFuture);
+                    addCallback(apiFuture, new 
TrackedApiFutureCallback(successes, failures), executor);
+                }
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
+            }
+        }
+        finishBatch(session, stopWatch, flowFileResults);
+    }
+
+    private ApiFuture<String> publishOneRecord(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final ByteArrayOutputStream baos,
+            final RecordSetWriter writer,
+            final Record record) throws IOException {
+        baos.reset();
+        writer.write(record);
+        writer.flush();
+        return publishOneMessage(context, flowFile, baos.toByteArray());
+    }
+
+    private ApiFuture<String> publishOneMessage(final ProcessContext context,
+                                                final FlowFile flowFile,
+                                                final byte[] content) {
+        final PubsubMessage message = PubsubMessage.newBuilder()
+                .setData(ByteString.copyFrom(content))
+                .setPublishTime(Timestamp.newBuilder().build())
+                .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+                .build();
+        return publisher.publish(message);
+    }
+
+    private void finishBatch(final ProcessSession session,
+                             final StopWatch stopWatch,
+                             final List<FlowFileResult> flowFileResults) {
+        final String topicName = publisher.getTopicNameString();
+        for (final FlowFileResult flowFileResult : flowFileResults) {
+            final Relationship relationship = flowFileResult.reconcile();
+            final Map<String, String> attributes = 
flowFileResult.getAttributes();
+            attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+            final FlowFile flowFile = 
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+            final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, 
topicName);
+            session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, relationship);
+        }
+    }
+
+    protected void addCallback(final ApiFuture<String> apiFuture, final 
ApiFutureCallback<? super String> callback, Executor executor) {
+        ApiFutures.addCallback(apiFuture, callback, executor);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        shutdownPublisher();
+    }
+
+    private void shutdownPublisher() {
+        try {
+            if (publisher != null) {
+                publisher.shutdown();
+            }
+        } catch (Exception e) {
+            getLogger().warn("Failed to gracefully shutdown the Google Cloud 
PubSub Publisher", e);
+        }
+    }
+
+    private ProjectTopicName getTopicName(ProcessContext context) {
+        final String topic = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+
+        if (topic.contains("/")) {
+            return ProjectTopicName.parse(topic);
+        } else {
+            return ProjectTopicName.of(projectId, topic);
+        }
+    }
+
+    private Map<String, String> getDynamicAttributesMap(ProcessContext 
context, FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            if (entry.getKey().isDynamic()) {
+                final String value = 
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
+                attributes.put(entry.getKey().getName(), value);
+            }
+        }
+
+        return attributes;
+    }
+
+    private Publisher.Builder getPublisherBuilder(ProcessContext context) {
+        final Long batchSizeThreshold = 
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
+        final long batchBytesThreshold = 
context.getProperty(BATCH_BYTES_THRESHOLD).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
+        final Long batchDelayThreshold = 
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String endpoint = 
context.getProperty(API_ENDPOINT).evaluateAttributeExpressions().getValue();
+
+        final Publisher.Builder publisherBuilder = 
Publisher.newBuilder(getTopicName(context))
+                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .setChannelProvider(getTransportChannelProvider(context))
+                .setEndpoint(endpoint);
+
+        publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
+                .setElementCountThreshold(batchSizeThreshold)
+                .setRequestByteThreshold(batchBytesThreshold)
+                .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
+                .setIsEnabled(true)
+                .build());
+
+        // Set fixed thread pool executor to number of concurrent tasks
+        publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new 
ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks())));
+
+        return publisherBuilder;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index e5a8f1eaaf..76a3da1e72 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -43,6 +43,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
+
 /**
  * Base class for creating processors which connect to Google Cloud Storage.
  *
@@ -68,6 +70,14 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
         return RELATIONSHIPS;
     }
 
+    public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
+            .name("Bucket")
+            .description(BUCKET_DESC)
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
     // 
https://cloud.google.com/storage/docs/request-endpoints#storage-set-client-endpoint-java
     public static final PropertyDescriptor STORAGE_API_URL = new 
PropertyDescriptor.Builder()
             .name("Storage API URL")
@@ -137,7 +147,7 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
     protected abstract List<String> getRequiredPermissions();
 
     protected String getBucketName(final ProcessContext context, final 
Map<String, String> attributes) {
-        return 
context.getProperty("gcs-bucket").evaluateAttributeExpressions(attributes).getValue();
+        return 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
     }
 
     @Override
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
index 61dc11f5ac..31ab641f1e 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
@@ -37,7 +37,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
-import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
 
 
@@ -49,12 +48,9 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 public class DeleteGCSObject extends AbstractGCSProcessor {
     public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
-            .name("Bucket")
-            .description(BUCKET_DESC)
-            .required(true)
+            .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
             .defaultValue("${" + BUCKET_ATTR + "}")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index e900e6d1ce..a9b5dd9915 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -169,12 +169,9 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
 )
 public class FetchGCSObject extends AbstractGCSProcessor {
     public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
-            .name("Bucket")
-            .description(BUCKET_DESC)
-            .required(true)
+            .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
             .defaultValue("${" + BUCKET_ATTR + "}")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 3f0ba5386b..b3d021ac57 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -221,14 +221,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         .required(true)
         .build();
 
-    public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
-            .name("Bucket")
-            .description(BUCKET_DESC)
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            .build();
-
     public static final PropertyDescriptor PREFIX = new 
PropertyDescriptor.Builder()
             .name("Prefix")
             .description("The prefix used to filter the object list. In most 
cases, it should end with a forward slash ('/').")
@@ -409,11 +401,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         return Collections.singletonList("storage.objects.list");
     }
 
-    @Override
-    protected String getBucketName(final ProcessContext context, final 
Map<String, String> attributes) {
-        return 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
-    }
-
     @Override
     public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
         final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, verificationLogger, attributes));
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 7c59e43dd4..e54b79e2bc 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -148,12 +148,9 @@ import static 
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
 })
 public class PutGCSObject extends AbstractGCSProcessor {
     public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
-        .name("Bucket")
-        .description(BUCKET_DESC)
-        .required(true)
+        .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
         .defaultValue("${" + BUCKET_ATTR + "}")
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
index 03810ad661..276aafcd3b 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
@@ -27,6 +27,7 @@ import com.google.api.services.drive.DriveScopes;
 import com.google.api.services.drive.model.File;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.Processor;
+import 
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
 import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
 import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
 import org.apache.nifi.processors.gcp.util.GoogleUtils;
@@ -106,6 +107,7 @@ public abstract class AbstractGoogleDriveIT<T extends 
GoogleDriveTrait & Process
         GCPCredentialsControllerService gcpCredentialsControllerService = new 
GCPCredentialsControllerService();
         testRunner.addControllerService("gcp_credentials_provider_service", 
gcpCredentialsControllerService);
 
+        testRunner.setProperty(gcpCredentialsControllerService, 
CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY, 
AuthenticationStrategy.SERVICE_ACCOUNT_JSON_FILE);
         testRunner.setProperty(gcpCredentialsControllerService, 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, 
CREDENTIAL_JSON_FILE_PATH);
         testRunner.enableControllerService(gcpCredentialsControllerService);
         testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp_credentials_provider_service");
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
index 933e6789bd..a19b7ea47b 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
@@ -1,47 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class AbstractGCPubSubIT {
-
-    protected static final String PROJECT_ID = "my-gcm-client";
-    protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
-    protected static TestRunner runner;
-
-    protected TestRunner setCredentialsCS(TestRunner runner) throws 
InitializationException {
-        final String serviceAccountJsonFilePath = "path/to/credentials/json";
-        final Map<String, String> propertiesMap = new HashMap<>();
-        final GCPCredentialsControllerService credentialsControllerService = 
new GCPCredentialsControllerService();
-
-        propertiesMap.put("application-default-credentials", "false");
-        propertiesMap.put("compute-engine-credentials", "false");
-        propertiesMap.put("service-account-json-file", 
serviceAccountJsonFilePath);
-
-        runner.addControllerService(CONTROLLER_SERVICE, 
credentialsControllerService, propertiesMap);
-        runner.enableControllerService(credentialsControllerService);
-        runner.assertValid(credentialsControllerService);
-
-        return runner;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import 
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
+import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AbstractGCPubSubIT {
+
+    protected static final String PROJECT_ID = "my-gcm-client";
+    protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static TestRunner runner;
+
+    protected TestRunner setCredentialsCS(TestRunner runner) throws 
InitializationException {
+        final String serviceAccountJsonFilePath = "path/to/credentials/json";
+        final Map<String, String> propertiesMap = new HashMap<>();
+        final GCPCredentialsControllerService credentialsControllerService = 
new GCPCredentialsControllerService();
+
+        
propertiesMap.put(CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY.getName(),
 AuthenticationStrategy.SERVICE_ACCOUNT_JSON_FILE.getValue());
+        
propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(),
 serviceAccountJsonFilePath);
+
+        runner.addControllerService(CONTROLLER_SERVICE, 
credentialsControllerService, propertiesMap);
+        runner.enableControllerService(credentialsControllerService);
+        runner.assertValid(credentialsControllerService);
+
+        return runner;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
index c50b365429..34bbf97918 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
@@ -24,6 +24,8 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.testing.RemoteStorageHelper;
 import org.apache.nifi.processor.Processor;
+import 
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
+import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
 import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -94,6 +96,7 @@ public abstract class AbstractGCSIT {
         final GCPCredentialsControllerService credentialsControllerService = 
new GCPCredentialsControllerService();
         final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.addControllerService("gcpCredentialsControllerService", 
credentialsControllerService);
+        runner.setProperty(credentialsControllerService, 
CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY, 
AuthenticationStrategy.APPLICATION_DEFAULT);
         runner.enableControllerService(credentialsControllerService);
 
         
runner.setProperty(AbstractGCSProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcpCredentialsControllerService");
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
index 18e817284b..c6a812ad59 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
@@ -87,9 +87,9 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("filename", KEY);
         final List<ConfigVerificationResult> results = 
processor.verify(runner.getProcessContext(), runner.getLogger(), attributes);
-        assertEquals(2, results.size());
+        assertEquals(3, results.size());
         assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
results.get(1).getOutcome());
-        assertTrue(results.get(1).getExplanation().matches("Successfully 
fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling 
3 bytes"));
+        assertTrue(results.get(2).getExplanation().matches("Successfully 
fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling 
3 bytes"));
 
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS, 1);
         final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS);

Reply via email to